From 4ca3c67bd3b29e2ab28a9cd9356f649a96c78923 Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Sun, 12 May 2013 02:36:59 -0700 Subject: [PATCH] [RocksDB] Cleanup compaction filter to use a class interface, instead of function pointer and additional context pointer. Summary: This diff replaces compaction_filter_args and CompactionFilter with a single compaction_filter parameter. It gives CompactionFilter better encapsulation and a similar look to Comparator and MergeOpertor, which improves consistency of the overall interface. The change is not backward compatible. Nevertheless, the two references in fbcode are not in production yet. Test Plan: make check Reviewers: dhruba Reviewed By: dhruba CC: leveldb, zshao Differential Revision: https://reviews.facebook.net/D10773 --- db/db_impl.cc | 12 ++-- db/db_test.cc | 85 ++++++++++++++++++++--------- include/leveldb/compaction_filter.h | 46 ++++++++++++++++ include/leveldb/options.h | 28 ++-------- util/options.cc | 10 ++-- utilities/ttl/db_ttl.cc | 17 +++--- utilities/ttl/db_ttl.h | 17 +++--- 7 files changed, 140 insertions(+), 75 deletions(-) create mode 100644 include/leveldb/compaction_filter.h diff --git a/db/db_impl.cc b/db/db_impl.cc index a6a884d12..1183ef128 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -26,6 +26,7 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "db/transaction_log_iterator_impl.h" +#include "leveldb/compaction_filter.h" #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/merge_operator.h" @@ -1701,7 +1702,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { key = merge.key(); ParseInternalKey(key, &ikey); value = merge.value(); - } else if (options_.CompactionFilter != nullptr && + } else if (options_.compaction_filter && ikey.type != kTypeDeletion && visible_at_tip) { // If the user has specified a compaction filter and there are no @@ -1712,11 +1713,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { assert(!drop); bool value_changed = false; compaction_filter_value.clear(); - drop = options_.CompactionFilter(options_.compaction_filter_args, - compact->compaction->level(), - ikey.user_key, value, - &compaction_filter_value, - &value_changed); + drop = options_.compaction_filter->Filter(compact->compaction->level(), + ikey.user_key, value, + &compaction_filter_value, + &value_changed); // Another example of statistics update without holding the lock // TODO: clean it up if (drop) { diff --git a/db/db_test.cc b/db/db_test.cc index 6cb42b8d4..e5d90f19b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12,6 +12,7 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "leveldb/cache.h" +#include "leveldb/compaction_filter.h" #include "leveldb/env.h" #include "table/table.h" #include "util/hash.h" @@ -1369,35 +1370,64 @@ TEST(DBTest, RepeatedWritesToSameKey) { // kvs during the compaction process. static int cfilter_count; static std::string NEW_VALUE = "NewValue"; -static bool keep_filter(void* arg, int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) { - assert(arg == nullptr); - cfilter_count++; - return false; -} -static bool delete_filter(void*argv, int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) { - assert(argv == nullptr); - cfilter_count++; - return true; -} -static bool change_filter(void*argv, int level, const Slice& key, - const Slice& value, std::string* new_value, - bool* value_changed) { - assert(argv == (void*)100); - assert(new_value != nullptr); - *new_value = NEW_VALUE; - *value_changed = true; - return false; -} + +class KeepFilter : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, + const Slice& value, std::string* new_value, + bool* value_changed) const override { + cfilter_count++; + return false; + } + + virtual const char* Name() const override { + return "KeepFilter"; + } + +}; + +class DeleteFilter : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, + const Slice& value, std::string* new_value, + bool* value_changed) const override { + cfilter_count++; + return true; + } + + virtual const char* Name() const override { + return "DeleteFilter"; + } +}; + +class ChangeFilter : public CompactionFilter { + public: + ChangeFilter(int argv) : argv_(argv) {} + + virtual bool Filter(int level, const Slice& key, + const Slice& value, std::string* new_value, + bool* value_changed) const override { + assert(argv_ == 100); + assert(new_value != nullptr); + *new_value = NEW_VALUE; + *value_changed = true; + return false; + } + + virtual const char* Name() const override { + return "ChangeFilter"; + } + + private: + const int argv_; +}; TEST(DBTest, CompactionFilter) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - options.CompactionFilter = keep_filter; + auto keep_filter = std::make_shared(); + options.compaction_filter = keep_filter.get(); Reopen(&options); // Write 100K keys, these are written to a few files in L0. @@ -1472,7 +1502,8 @@ TEST(DBTest, CompactionFilter) { // create a new database with the compaction // filter in such a way that it deletes all keys - options.CompactionFilter = delete_filter; + auto delete_filter = std::make_shared(); + options.compaction_filter = delete_filter.get(); options.create_if_missing = true; DestroyAndReopen(&options); @@ -1535,8 +1566,8 @@ TEST(DBTest, CompactionFilterWithValueChange) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - options.compaction_filter_args = (void *)100; - options.CompactionFilter = change_filter; + auto change_filter = std::make_shared(100); + options.compaction_filter = change_filter.get(); Reopen(&options); // Write 100K+1 keys, these are written to a few files diff --git a/include/leveldb/compaction_filter.h b/include/leveldb/compaction_filter.h new file mode 100644 index 000000000..73614742e --- /dev/null +++ b/include/leveldb/compaction_filter.h @@ -0,0 +1,46 @@ +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ +#define STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ + +#include + +namespace leveldb { + +class Slice; + +// CompactionFilter allows an application to modify/delete a key-value at +// the time of compaction. + +class CompactionFilter { + public: + virtual ~CompactionFilter() {} + + + // The compaction process invokes this + // method for kv that is being compacted. A return value + // of false indicates that the kv should be preserved in the + // output of this compaction run and a return value of true + // indicates that this key-value should be removed from the + // output of the compaction. The application can inspect + // the existing value of the key and make decision based on it. + // + // When the value is to be preserved, the application has the option + // to modify the existing_value and pass it back through new_value. + // value_changed needs to be set to true in this case. + virtual bool Filter(int level, + const Slice& key, + const Slice& existing_value, + std::string* new_value, + bool* value_changed) const = 0; + + // Returns a name that identifies this compaction filter. + // The name will be printed to LOG file on start up for diagnosis. + virtual const char* Name() const = 0; +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 557303138..e435e5865 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -22,6 +22,7 @@ class FilterPolicy; class Logger; class MergeOperator; class Snapshot; +class CompactionFilter; using std::shared_ptr; @@ -76,6 +77,11 @@ struct Options { // Default: nullptr const MergeOperator* merge_operator; + // Allows an application to modify/delete a key-value during background + // compaction. + // Default: nullptr + const CompactionFilter* compaction_filter; + // If true, the database will be created if it is missing. // Default: false bool create_if_missing; @@ -367,28 +373,6 @@ struct Options { // from the database, because otherwise the read can be very slow. Options* PrepareForBulkLoad(); - // This method allows an application to modify/delete a key-value at - // the time of compaction. The compaction process invokes this - // method for kv that is being compacted. A return value - // of false indicates that the kv should be preserved in the - // output of this compaction run and a return value of true - // indicates that this key-value should be removed from the - // output of the compaction. The application can inspect - // the existing value of the key and make decision based on it. - - // When the value is to be preserved, the application has the option - // to modify the existing_value and pass it back through new_value. - // value_changed needs to be set to true in this case. - - // The compaction_filter_args, if specified here, are passed - // back to the invocation of the CompactionFilter. - void* compaction_filter_args; - bool (*CompactionFilter)(void* compaction_filter_args, - int level, const Slice& key, - const Slice& existing_value, - std::string* new_value, - bool* value_changed); - // Disable automatic compactions. Manual compactions can still // be issued on this database. bool disable_auto_compactions; diff --git a/util/options.cc b/util/options.cc index 0fa80a06b..689aa3df2 100644 --- a/util/options.cc +++ b/util/options.cc @@ -7,6 +7,7 @@ #include #include "leveldb/cache.h" +#include "leveldb/compaction_filter.h" #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/filter_policy.h" @@ -17,6 +18,7 @@ namespace leveldb { Options::Options() : comparator(BytewiseComparator()), merge_operator(nullptr), + compaction_filter(nullptr), create_if_missing(false), error_if_exists(false), paranoid_checks(false), @@ -56,8 +58,6 @@ Options::Options() max_manifest_file_size(std::numeric_limits::max()), no_block_cache(false), table_cache_numshardbits(4), - compaction_filter_args(nullptr), - CompactionFilter(nullptr), disable_auto_compactions(false), WAL_ttl_seconds(0), manifest_preallocation_size(4 * 1024 * 1024), @@ -76,6 +76,8 @@ Options::Dump(Logger* log) const Log(log," Options.comparator: %s", comparator->Name()); Log(log," Options.merge_operator: %s", merge_operator? merge_operator->Name() : "None"); + Log(log," Options.compaction_filter: %s", + compaction_filter? compaction_filter->Name() : "None"); Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.env: %p", env); @@ -162,10 +164,6 @@ Options::Dump(Logger* log) const rate_limit); Log(log," Options.rate_limit_delay_milliseconds: %d", rate_limit_delay_milliseconds); - Log(log," Options.compaction_filter_args: %p", - compaction_filter_args); - Log(log," Options.CompactionFilter: %p", - CompactionFilter); Log(log," Options.disable_auto_compactions: %d", disable_auto_compactions); Log(log," Options.WAL_ttl_seconds: %ld", diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 949140c58..9e628b8ad 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -75,10 +75,9 @@ DBWithTTL::DBWithTTL(const int32_t ttl, Status& st, bool read_only) : ttl_(ttl) { - assert(options.CompactionFilter == nullptr); + assert(options.compaction_filter == nullptr); Options options_to_open = options; - options_to_open.compaction_filter_args = &ttl_; - options_to_open.CompactionFilter = DeleteByTS; + options_to_open.compaction_filter = this; if (read_only) { st = DB::OpenForReadOnly(options_to_open, dbname, &db_); } else { @@ -105,16 +104,20 @@ Status UtilityDB::OpenTtlDB( } // returns true(i.e. key-value to be deleted) if its TS has expired based on ttl -bool DBWithTTL::DeleteByTS( - void* args, +bool DBWithTTL::Filter( int level, const Slice& key, const Slice& old_val, std::string* new_val, - bool* value_changed) { - return IsStale(old_val, *(int32_t*)args); + bool* value_changed) const { + return IsStale(old_val, ttl_); } +const char* DBWithTTL::Name() const { + return "Delete By TTL"; +} + + // Gives back the current time Status DBWithTTL::GetCurrentTime(int32_t& curtime) { return Env::Default()->GetCurrentTime((int64_t*)&curtime); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index b36573b87..ecae9edc1 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -6,11 +6,12 @@ #define LEVELDB_UTILITIES_TTL_DB_TTL_H_ #include "include/leveldb/db.h" +#include "include/leveldb/compaction_filter.h" #include "db/db_impl.h" namespace leveldb { -class DBWithTTL : public DB { +class DBWithTTL : public DB, CompactionFilter { public: DBWithTTL(const int32_t ttl, const Options& options, @@ -71,12 +72,14 @@ class DBWithTTL : public DB { // Simulate a db crash, no elegant closing of database. void TEST_Destroy_DBWithTtl(); - static bool DeleteByTS(void* args, - int level, - const Slice& key, - const Slice& old_val, - std::string* new_val, - bool* value_changed); + // The following two methods are for CompactionFilter + virtual bool Filter(int level, + const Slice& key, + const Slice& old_val, + std::string* new_val, + bool* value_changed) const override; + + virtual const char* Name() const override; static bool IsStale(const Slice& value, int32_t ttl);