diff --git a/db/db_impl.cc b/db/db_impl.cc index a91c58de0..6083746fa 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -158,6 +158,10 @@ Options SanitizeOptions(const std::string& dbname, if (result.soft_rate_limit > result.hard_rate_limit) { result.soft_rate_limit = result.hard_rate_limit; } + if (result.compaction_filter && + result.compaction_filter_factory->CreateCompactionFilter().get()) { + Log(result.info_log, "Both filter and factory specified. Using filter"); + } return result; } @@ -1784,6 +1788,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { MergeHelper merge(user_comparator(), options_.merge_operator, options_.info_log.get(), false /* internal key corruption is expected */); + auto compaction_filter = options_.compaction_filter; + std::unique_ptr compaction_filter_from_factory = nullptr; + if (!compaction_filter) { + compaction_filter_from_factory = std::move( + options_.compaction_filter_factory->CreateCompactionFilter()); + compaction_filter = compaction_filter_from_factory.get(); + } for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { @@ -1830,7 +1841,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { visible_in_snapshot = kMaxSequenceNumber; // apply the compaction filter to the first occurrence of the user key - if (options_.compaction_filter && + if (compaction_filter && ikey.type == kTypeValue && (visible_at_tip || ikey.sequence > latest_snapshot)) { // If the user has specified a compaction filter and the sequence @@ -1841,7 +1852,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { bool value_changed = false; compaction_filter_value.clear(); bool to_delete = - options_.compaction_filter->Filter(compact->compaction->level(), + compaction_filter->Filter(compact->compaction->level(), ikey.user_key, value, &compaction_filter_value, &value_changed); diff --git a/db/db_test.cc b/db/db_test.cc index 3a3e50cc5..18f2e8832 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1698,12 +1698,52 @@ class ChangeFilter : public CompactionFilter { const int argv_; }; +class KeepFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(new KeepFilter()); + } + + virtual const char* Name() const override { + return "KeepFilterFactory"; + } +}; + +class DeleteFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(new DeleteFilter()); + } + + virtual const char* Name() const override { + return "DeleteFilterFactory"; + } +}; + +class ChangeFilterFactory : public CompactionFilterFactory { + public: + explicit ChangeFilterFactory(int argv) : argv_(argv) {} + + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(new ChangeFilter(argv_)); + } + + virtual const char* Name() const override { + return "ChangeFilterFactory"; + } + + private: + const int argv_; +}; + TEST(DBTest, CompactionFilter) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto keep_filter = std::make_shared(); - options.compaction_filter = keep_filter.get(); + options.compaction_filter_factory = std::make_shared(); Reopen(&options); // Write 100K keys, these are written to a few files in L0. @@ -1778,8 +1818,7 @@ TEST(DBTest, CompactionFilter) { // create a new database with the compaction // filter in such a way that it deletes all keys - auto delete_filter = std::make_shared(); - options.compaction_filter = delete_filter.get(); + options.compaction_filter_factory = std::make_shared(); options.create_if_missing = true; DestroyAndReopen(&options); @@ -1843,8 +1882,8 @@ TEST(DBTest, CompactionFilterWithValueChange) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto change_filter = std::make_shared(100); - options.compaction_filter = change_filter.get(); + options.compaction_filter_factory = + std::make_shared(100); Reopen(&options); // Write 100K+1 keys, these are written to a few files diff --git a/include/leveldb/compaction_filter.h b/include/leveldb/compaction_filter.h index 73614742e..54a9b39ed 100644 --- a/include/leveldb/compaction_filter.h +++ b/include/leveldb/compaction_filter.h @@ -41,6 +41,31 @@ class CompactionFilter { virtual const char* Name() const = 0; }; +// Each compaction will create a new CompactionFilter allowing the +// application to know about different campactions +class CompactionFilterFactory { + public: + virtual ~CompactionFilterFactory() { }; + virtual std::unique_ptr CreateCompactionFilter() = 0; + + // Returns a name that identifies this compaction filter factory. + virtual const char* Name() const = 0; +}; + +// Default implementaion of CompactionFilterFactory which does not +// return any filter +class DefaultCompactionFilterFactory : public CompactionFilterFactory { + public: + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr(nullptr); + } + + virtual const char* Name() const override { + return "DefaultCompactionFilterFactory"; + } +}; + } // namespace leveldb #endif // STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 52b6a2058..30a8dc328 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -25,6 +25,7 @@ class Logger; class MergeOperator; class Snapshot; class CompactionFilter; +class CompactionFilterFactory; using std::shared_ptr; @@ -84,8 +85,13 @@ struct Options { // Default: nullptr const MergeOperator* merge_operator; + // The client must provide compaction_filter_factory if it requires a new + // compaction filter to be used for different compaction processes // Allows an application to modify/delete a key-value during background // compaction. + // Ideally, client should specify only one of filter or factory. + // compaction_filter takes precedence over compaction_filter_factory if + // client specifies both. // Default: nullptr const CompactionFilter* compaction_filter; @@ -506,6 +512,11 @@ struct Options { // Default: a factory that provides a skip-list-based implementation of // MemTableRep. std::shared_ptr memtable_factory; + + // This is a factory that provides compaction filter objects which allow + // an application to modify/delete a key-value during background compaction. + // Default: a factory that doesn't provide any object + std::shared_ptr compaction_filter_factory; }; // Options that control read operations diff --git a/util/options.cc b/util/options.cc index 127fe8181..cc5653326 100644 --- a/util/options.cc +++ b/util/options.cc @@ -80,7 +80,11 @@ Options::Options() bytes_per_sync(0), compaction_style(kCompactionStyleLevel), filter_deletes(false), - memtable_factory(std::shared_ptr(new SkipListFactory)) { + memtable_factory(std::shared_ptr(new SkipListFactory)), + compaction_filter_factory( + std::shared_ptr( + new DefaultCompactionFilterFactory())) { + assert(memtable_factory.get() != nullptr); } @@ -96,6 +100,8 @@ Options::Dump(Logger* log) const merge_operator? merge_operator->Name() : "None"); Log(log," Options.compaction_filter: %s", compaction_filter? compaction_filter->Name() : "None"); + Log(log," Options.compaction_filter_factory: %s", + compaction_filter_factory->Name()); Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.env: %p", env); diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 4b643239a..0791cdc87 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -20,9 +20,16 @@ DBWithTTL::DBWithTTL(const int32_t ttl, ttl_(ttl) { Options options_to_open = options; - ttl_comp_filter_.reset(new TtlCompactionFilter(ttl, - options.compaction_filter)); - options_to_open.compaction_filter = ttl_comp_filter_.get(); + if (options.compaction_filter) { + ttl_comp_filter_.reset( + new TtlCompactionFilter(ttl, options.compaction_filter)); + options_to_open.compaction_filter = ttl_comp_filter_.get(); + } else { + options_to_open.compaction_filter_factory = + std::shared_ptr( + new TtlCompactionFilterFactory( + ttl, options.compaction_filter_factory)); + } if (options.merge_operator) { ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator)); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index dae0ba849..dcc41bce5 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -176,11 +176,19 @@ class TtlIterator : public Iterator { class TtlCompactionFilter : public CompactionFilter { public: - TtlCompactionFilter(int32_t ttl, const CompactionFilter* comp_filter) + TtlCompactionFilter( + int32_t ttl, + const CompactionFilter* user_comp_filter, + std::unique_ptr + user_comp_filter_from_factory = nullptr) : ttl_(ttl), - user_comp_filter_(comp_filter) { + user_comp_filter_(user_comp_filter), + user_comp_filter_from_factory_(std::move(user_comp_filter_from_factory)) { // Unlike the merge operator, compaction filter is necessary for TTL, hence // this would be called even if user doesn't specify any compaction-filter + if (!user_comp_filter_) { + user_comp_filter_ = user_comp_filter_from_factory_.get(); + } } virtual bool Filter(int level, @@ -215,6 +223,32 @@ class TtlCompactionFilter : public CompactionFilter { private: int32_t ttl_; const CompactionFilter* user_comp_filter_; + std::unique_ptr user_comp_filter_from_factory_; +}; + +class TtlCompactionFilterFactory : public CompactionFilterFactory { + public: + TtlCompactionFilterFactory( + int32_t ttl, + std::shared_ptr comp_filter_factory) + : ttl_(ttl), + user_comp_filter_factory_(comp_filter_factory) { } + + virtual std::unique_ptr CreateCompactionFilter() { + return std::unique_ptr( + new TtlCompactionFilter( + ttl_, + nullptr, + std::move(user_comp_filter_factory_->CreateCompactionFilter()))); + } + + virtual const char* Name() const override { + return "TtlCompactionFilterFactory"; + } + + private: + int32_t ttl_; + std::shared_ptr user_comp_filter_factory_; }; class TtlMergeOperator : public MergeOperator { diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index ba232001c..3d357e0cf 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include "leveldb/compaction_filter.h" #include "utilities/utility_db.h" #include "util/testharness.h" @@ -54,8 +55,9 @@ class TtlTest { // Open with TestFilter compaction filter void OpenTtlWithTestCompaction(int32_t ttl) { - test_comp_filter_.reset(new TestFilter(kSampleSize_, kNewValue_)); - options_.compaction_filter = test_comp_filter_.get(); + options_.compaction_filter_factory = + std::shared_ptr( + new TestFilterFactory(kSampleSize_, kNewValue_)); OpenTtl(ttl); } @@ -252,6 +254,29 @@ class TtlTest { const std::string kNewValue_; }; + class TestFilterFactory : public CompactionFilterFactory { + public: + TestFilterFactory(const int64_t kSampleSize, const std::string kNewValue) + : kSampleSize_(kSampleSize), + kNewValue_(kNewValue) { + } + + virtual std::unique_ptr + CreateCompactionFilter() override { + return std::unique_ptr( + new TestFilter(kSampleSize_, kNewValue_)); + } + + virtual const char* Name() const override { + return "TestFilterFactory"; + } + + private: + const int64_t kSampleSize_; + const std::string kNewValue_; + }; + + // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer const int64_t kSampleSize_ = 100;