[RocksDB] Cleanup compaction filter to use a class interface, instead of function pointer and additional context pointer.

Summary:
This diff replaces compaction_filter_args and CompactionFilter with a single compaction_filter parameter. It gives CompactionFilter better encapsulation and a similar look to Comparator and MergeOpertor, which improves consistency of the overall interface.
The change is not backward compatible. Nevertheless, the two references in fbcode are not in production yet.

Test Plan: make check

Reviewers: dhruba

Reviewed By: dhruba

CC: leveldb, zshao

Differential Revision: https://reviews.facebook.net/D10773
main
Haobo Xu 11 years ago
parent 73c0a33346
commit 4ca3c67bd3
  1. 12
      db/db_impl.cc
  2. 85
      db/db_test.cc
  3. 46
      include/leveldb/compaction_filter.h
  4. 28
      include/leveldb/options.h
  5. 10
      util/options.cc
  6. 17
      utilities/ttl/db_ttl.cc
  7. 17
      utilities/ttl/db_ttl.h

@ -26,6 +26,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "db/transaction_log_iterator_impl.h" #include "db/transaction_log_iterator_impl.h"
#include "leveldb/compaction_filter.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/merge_operator.h" #include "leveldb/merge_operator.h"
@ -1701,7 +1702,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
key = merge.key(); key = merge.key();
ParseInternalKey(key, &ikey); ParseInternalKey(key, &ikey);
value = merge.value(); value = merge.value();
} else if (options_.CompactionFilter != nullptr && } else if (options_.compaction_filter &&
ikey.type != kTypeDeletion && ikey.type != kTypeDeletion &&
visible_at_tip) { visible_at_tip) {
// If the user has specified a compaction filter and there are no // If the user has specified a compaction filter and there are no
@ -1712,11 +1713,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
assert(!drop); assert(!drop);
bool value_changed = false; bool value_changed = false;
compaction_filter_value.clear(); compaction_filter_value.clear();
drop = options_.CompactionFilter(options_.compaction_filter_args, drop = options_.compaction_filter->Filter(compact->compaction->level(),
compact->compaction->level(), ikey.user_key, value,
ikey.user_key, value, &compaction_filter_value,
&compaction_filter_value, &value_changed);
&value_changed);
// Another example of statistics update without holding the lock // Another example of statistics update without holding the lock
// TODO: clean it up // TODO: clean it up
if (drop) { if (drop) {

@ -12,6 +12,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "leveldb/cache.h" #include "leveldb/cache.h"
#include "leveldb/compaction_filter.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "table/table.h" #include "table/table.h"
#include "util/hash.h" #include "util/hash.h"
@ -1369,35 +1370,64 @@ TEST(DBTest, RepeatedWritesToSameKey) {
// kvs during the compaction process. // kvs during the compaction process.
static int cfilter_count; static int cfilter_count;
static std::string NEW_VALUE = "NewValue"; static std::string NEW_VALUE = "NewValue";
static bool keep_filter(void* arg, int level, const Slice& key,
const Slice& value, std::string* new_value, class KeepFilter : public CompactionFilter {
bool* value_changed) { public:
assert(arg == nullptr); virtual bool Filter(int level, const Slice& key,
cfilter_count++; const Slice& value, std::string* new_value,
return false; bool* value_changed) const override {
} cfilter_count++;
static bool delete_filter(void*argv, int level, const Slice& key, return false;
const Slice& value, std::string* new_value, }
bool* value_changed) {
assert(argv == nullptr); virtual const char* Name() const override {
cfilter_count++; return "KeepFilter";
return true; }
}
static bool change_filter(void*argv, int level, const Slice& key, };
const Slice& value, std::string* new_value,
bool* value_changed) { class DeleteFilter : public CompactionFilter {
assert(argv == (void*)100); public:
assert(new_value != nullptr); virtual bool Filter(int level, const Slice& key,
*new_value = NEW_VALUE; const Slice& value, std::string* new_value,
*value_changed = true; bool* value_changed) const override {
return false; cfilter_count++;
} return true;
}
virtual const char* Name() const override {
return "DeleteFilter";
}
};
class ChangeFilter : public CompactionFilter {
public:
ChangeFilter(int argv) : argv_(argv) {}
virtual bool Filter(int level, const Slice& key,
const Slice& value, std::string* new_value,
bool* value_changed) const override {
assert(argv_ == 100);
assert(new_value != nullptr);
*new_value = NEW_VALUE;
*value_changed = true;
return false;
}
virtual const char* Name() const override {
return "ChangeFilter";
}
private:
const int argv_;
};
TEST(DBTest, CompactionFilter) { TEST(DBTest, CompactionFilter) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0; options.max_mem_compaction_level = 0;
options.CompactionFilter = keep_filter; auto keep_filter = std::make_shared<KeepFilter>();
options.compaction_filter = keep_filter.get();
Reopen(&options); Reopen(&options);
// Write 100K keys, these are written to a few files in L0. // Write 100K keys, these are written to a few files in L0.
@ -1472,7 +1502,8 @@ TEST(DBTest, CompactionFilter) {
// create a new database with the compaction // create a new database with the compaction
// filter in such a way that it deletes all keys // filter in such a way that it deletes all keys
options.CompactionFilter = delete_filter; auto delete_filter = std::make_shared<DeleteFilter>();
options.compaction_filter = delete_filter.get();
options.create_if_missing = true; options.create_if_missing = true;
DestroyAndReopen(&options); DestroyAndReopen(&options);
@ -1535,8 +1566,8 @@ TEST(DBTest, CompactionFilterWithValueChange) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.num_levels = 3; options.num_levels = 3;
options.max_mem_compaction_level = 0; options.max_mem_compaction_level = 0;
options.compaction_filter_args = (void *)100; auto change_filter = std::make_shared<ChangeFilter>(100);
options.CompactionFilter = change_filter; options.compaction_filter = change_filter.get();
Reopen(&options); Reopen(&options);
// Write 100K+1 keys, these are written to a few files // Write 100K+1 keys, these are written to a few files

@ -0,0 +1,46 @@
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_
#define STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_
#include <string>
namespace leveldb {
class Slice;
// CompactionFilter allows an application to modify/delete a key-value at
// the time of compaction.
class CompactionFilter {
public:
virtual ~CompactionFilter() {}
// The compaction process invokes this
// method for kv that is being compacted. A return value
// of false indicates that the kv should be preserved in the
// output of this compaction run and a return value of true
// indicates that this key-value should be removed from the
// output of the compaction. The application can inspect
// the existing value of the key and make decision based on it.
//
// When the value is to be preserved, the application has the option
// to modify the existing_value and pass it back through new_value.
// value_changed needs to be set to true in this case.
virtual bool Filter(int level,
const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const = 0;
// Returns a name that identifies this compaction filter.
// The name will be printed to LOG file on start up for diagnosis.
virtual const char* Name() const = 0;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_COMPACTION_FILTER_H_

@ -22,6 +22,7 @@ class FilterPolicy;
class Logger; class Logger;
class MergeOperator; class MergeOperator;
class Snapshot; class Snapshot;
class CompactionFilter;
using std::shared_ptr; using std::shared_ptr;
@ -76,6 +77,11 @@ struct Options {
// Default: nullptr // Default: nullptr
const MergeOperator* merge_operator; const MergeOperator* merge_operator;
// Allows an application to modify/delete a key-value during background
// compaction.
// Default: nullptr
const CompactionFilter* compaction_filter;
// If true, the database will be created if it is missing. // If true, the database will be created if it is missing.
// Default: false // Default: false
bool create_if_missing; bool create_if_missing;
@ -367,28 +373,6 @@ struct Options {
// from the database, because otherwise the read can be very slow. // from the database, because otherwise the read can be very slow.
Options* PrepareForBulkLoad(); Options* PrepareForBulkLoad();
// This method allows an application to modify/delete a key-value at
// the time of compaction. The compaction process invokes this
// method for kv that is being compacted. A return value
// of false indicates that the kv should be preserved in the
// output of this compaction run and a return value of true
// indicates that this key-value should be removed from the
// output of the compaction. The application can inspect
// the existing value of the key and make decision based on it.
// When the value is to be preserved, the application has the option
// to modify the existing_value and pass it back through new_value.
// value_changed needs to be set to true in this case.
// The compaction_filter_args, if specified here, are passed
// back to the invocation of the CompactionFilter.
void* compaction_filter_args;
bool (*CompactionFilter)(void* compaction_filter_args,
int level, const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed);
// Disable automatic compactions. Manual compactions can still // Disable automatic compactions. Manual compactions can still
// be issued on this database. // be issued on this database.
bool disable_auto_compactions; bool disable_auto_compactions;

@ -7,6 +7,7 @@
#include <limits> #include <limits>
#include "leveldb/cache.h" #include "leveldb/cache.h"
#include "leveldb/compaction_filter.h"
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/filter_policy.h" #include "leveldb/filter_policy.h"
@ -17,6 +18,7 @@ namespace leveldb {
Options::Options() Options::Options()
: comparator(BytewiseComparator()), : comparator(BytewiseComparator()),
merge_operator(nullptr), merge_operator(nullptr),
compaction_filter(nullptr),
create_if_missing(false), create_if_missing(false),
error_if_exists(false), error_if_exists(false),
paranoid_checks(false), paranoid_checks(false),
@ -56,8 +58,6 @@ Options::Options()
max_manifest_file_size(std::numeric_limits<uint64_t>::max()), max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
no_block_cache(false), no_block_cache(false),
table_cache_numshardbits(4), table_cache_numshardbits(4),
compaction_filter_args(nullptr),
CompactionFilter(nullptr),
disable_auto_compactions(false), disable_auto_compactions(false),
WAL_ttl_seconds(0), WAL_ttl_seconds(0),
manifest_preallocation_size(4 * 1024 * 1024), manifest_preallocation_size(4 * 1024 * 1024),
@ -76,6 +76,8 @@ Options::Dump(Logger* log) const
Log(log," Options.comparator: %s", comparator->Name()); Log(log," Options.comparator: %s", comparator->Name());
Log(log," Options.merge_operator: %s", Log(log," Options.merge_operator: %s",
merge_operator? merge_operator->Name() : "None"); merge_operator? merge_operator->Name() : "None");
Log(log," Options.compaction_filter: %s",
compaction_filter? compaction_filter->Name() : "None");
Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.error_if_exists: %d", error_if_exists);
Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.paranoid_checks: %d", paranoid_checks);
Log(log," Options.env: %p", env); Log(log," Options.env: %p", env);
@ -162,10 +164,6 @@ Options::Dump(Logger* log) const
rate_limit); rate_limit);
Log(log," Options.rate_limit_delay_milliseconds: %d", Log(log," Options.rate_limit_delay_milliseconds: %d",
rate_limit_delay_milliseconds); rate_limit_delay_milliseconds);
Log(log," Options.compaction_filter_args: %p",
compaction_filter_args);
Log(log," Options.CompactionFilter: %p",
CompactionFilter);
Log(log," Options.disable_auto_compactions: %d", Log(log," Options.disable_auto_compactions: %d",
disable_auto_compactions); disable_auto_compactions);
Log(log," Options.WAL_ttl_seconds: %ld", Log(log," Options.WAL_ttl_seconds: %ld",

@ -75,10 +75,9 @@ DBWithTTL::DBWithTTL(const int32_t ttl,
Status& st, Status& st,
bool read_only) bool read_only)
: ttl_(ttl) { : ttl_(ttl) {
assert(options.CompactionFilter == nullptr); assert(options.compaction_filter == nullptr);
Options options_to_open = options; Options options_to_open = options;
options_to_open.compaction_filter_args = &ttl_; options_to_open.compaction_filter = this;
options_to_open.CompactionFilter = DeleteByTS;
if (read_only) { if (read_only) {
st = DB::OpenForReadOnly(options_to_open, dbname, &db_); st = DB::OpenForReadOnly(options_to_open, dbname, &db_);
} else { } else {
@ -105,16 +104,20 @@ Status UtilityDB::OpenTtlDB(
} }
// returns true(i.e. key-value to be deleted) if its TS has expired based on ttl // returns true(i.e. key-value to be deleted) if its TS has expired based on ttl
bool DBWithTTL::DeleteByTS( bool DBWithTTL::Filter(
void* args,
int level, int level,
const Slice& key, const Slice& key,
const Slice& old_val, const Slice& old_val,
std::string* new_val, std::string* new_val,
bool* value_changed) { bool* value_changed) const {
return IsStale(old_val, *(int32_t*)args); return IsStale(old_val, ttl_);
} }
const char* DBWithTTL::Name() const {
return "Delete By TTL";
}
// Gives back the current time // Gives back the current time
Status DBWithTTL::GetCurrentTime(int32_t& curtime) { Status DBWithTTL::GetCurrentTime(int32_t& curtime) {
return Env::Default()->GetCurrentTime((int64_t*)&curtime); return Env::Default()->GetCurrentTime((int64_t*)&curtime);

@ -6,11 +6,12 @@
#define LEVELDB_UTILITIES_TTL_DB_TTL_H_ #define LEVELDB_UTILITIES_TTL_DB_TTL_H_
#include "include/leveldb/db.h" #include "include/leveldb/db.h"
#include "include/leveldb/compaction_filter.h"
#include "db/db_impl.h" #include "db/db_impl.h"
namespace leveldb { namespace leveldb {
class DBWithTTL : public DB { class DBWithTTL : public DB, CompactionFilter {
public: public:
DBWithTTL(const int32_t ttl, DBWithTTL(const int32_t ttl,
const Options& options, const Options& options,
@ -71,12 +72,14 @@ class DBWithTTL : public DB {
// Simulate a db crash, no elegant closing of database. // Simulate a db crash, no elegant closing of database.
void TEST_Destroy_DBWithTtl(); void TEST_Destroy_DBWithTtl();
static bool DeleteByTS(void* args, // The following two methods are for CompactionFilter
int level, virtual bool Filter(int level,
const Slice& key, const Slice& key,
const Slice& old_val, const Slice& old_val,
std::string* new_val, std::string* new_val,
bool* value_changed); bool* value_changed) const override;
virtual const char* Name() const override;
static bool IsStale(const Slice& value, int32_t ttl); static bool IsStale(const Slice& value, int32_t ttl);

Loading…
Cancel
Save