From df2701373d6247c540b8818e2c14c1b1f02c8cfe Mon Sep 17 00:00:00 2001 From: Evan Shaw Date: Wed, 18 Jun 2014 14:23:47 +1200 Subject: [PATCH 1/3] Support for compaction filters in the C API --- db/c.cc | 69 ++++++++++++++++++++++++++++++++++++++++++++- include/rocksdb/c.h | 17 +++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/db/c.cc b/db/c.cc index b50e59ecc..ec4788844 100644 --- a/db/c.cc +++ b/db/c.cc @@ -14,6 +14,7 @@ #include #include #include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -30,6 +31,7 @@ #include "rocksdb/table.h" using rocksdb::Cache; +using rocksdb::CompactionFilter; using rocksdb::Comparator; using rocksdb::CompressionType; using rocksdb::DB; @@ -77,6 +79,49 @@ struct rocksdb_logger_t { shared_ptr rep; }; struct rocksdb_cache_t { shared_ptr rep; }; struct rocksdb_livefiles_t { std::vector rep; }; +struct rocksdb_compactionfilter_t : public CompactionFilter { + void* state_; + void (*destructor_)(void*); + unsigned char (*filter_)( + void*, + int level, + const char* key, size_t key_length, + const char* existing_value, size_t value_length, + char** new_value, size_t *new_value_length, + unsigned char* value_changed); + const char* (*name_)(void*); + + virtual ~rocksdb_compactionfilter_t() { + (*destructor_)(state_); + } + + virtual bool Filter( + int level, + const Slice& key, + const Slice& existing_value, + std::string* new_value, + bool* value_changed) const { + char* c_new_value = NULL; + size_t new_value_length = 0; + unsigned char c_value_changed = 0; + unsigned char result = (*filter_)( + state_, + level, + key.data(), key.size(), + existing_value.data(), existing_value.size(), + &c_new_value, &new_value_length, &c_value_changed); + if (c_value_changed) { + new_value->assign(c_new_value, new_value_length); + *value_changed = true; + } + return result; + } + + virtual const char* Name() const { + return (*name_)(state_); + } +}; + struct rocksdb_comparator_t : public Comparator { void* state_; void (*destructor_)(void*); @@ -1119,10 +1164,32 @@ DB::GetUpdatesSince DB::GetDbIdentity DB::RunManualCompaction custom cache -compaction_filter table_properties_collectors */ +rocksdb_compactionfilter_t* rocksdb_compactionfilter_create( + void* state, + void (*destructor)(void*), + unsigned char (*filter)( + void*, + int level, + const char* key, size_t key_length, + const char* existing_value, size_t value_length, + char** new_value, size_t *new_value_length, + unsigned char* value_changed), + const char* (*name)(void*)) { + rocksdb_compactionfilter_t* result = new rocksdb_compactionfilter_t; + result->state_ = state; + result->destructor_ = destructor; + result->filter_ = filter; + result->name_ = name; + return result; +} + +void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t* filter) { + delete filter; +} + rocksdb_comparator_t* rocksdb_comparator_create( void* state, void (*destructor)(void*), diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 013ee5d2a..3a96bfbd1 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -56,6 +56,7 @@ extern "C" { typedef struct rocksdb_t rocksdb_t; typedef struct rocksdb_cache_t rocksdb_cache_t; +typedef struct rocksdb_compactionfilter_t rocksdb_compactionfilter_t; typedef struct rocksdb_comparator_t rocksdb_comparator_t; typedef struct rocksdb_env_t rocksdb_env_t; typedef struct rocksdb_filelock_t rocksdb_filelock_t; @@ -401,6 +402,22 @@ enum { }; extern void rocksdb_options_set_compaction_style(rocksdb_options_t*, int); extern void rocksdb_options_set_universal_compaction_options(rocksdb_options_t*, rocksdb_universal_compaction_options_t*); + +/* Compaction Filter */ + +extern rocksdb_compactionfilter_t* rocksdb_compactionfilter_create( + void* state, + void (*destructor)(void*), + unsigned char (*filter)( + void*, + int level, + const char* key, size_t key_length, + const char* existing_value, size_t value_length, + char** new_value, size_t *new_value_length, + unsigned char* value_changed), + const char* (*name)(void*)); +extern void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t*); + /* Comparator */ extern rocksdb_comparator_t* rocksdb_comparator_create( From d72313a7fa204c2803fa80ae44af65e7ba392363 Mon Sep 17 00:00:00 2001 From: Evan Shaw Date: Wed, 18 Jun 2014 14:28:11 +1200 Subject: [PATCH 2/3] Add a way to set compaction filter in the C API --- db/c.cc | 6 ++++++ include/rocksdb/c.h | 3 +++ 2 files changed, 9 insertions(+) diff --git a/db/c.cc b/db/c.cc index ec4788844..219f65094 100644 --- a/db/c.cc +++ b/db/c.cc @@ -676,6 +676,12 @@ void rocksdb_options_destroy(rocksdb_options_t* options) { delete options; } +void rocksdb_options_set_compaction_filter( + rocksdb_options_t* opt, + rocksdb_compactionfilter_t* filter) { + opt->rep.compaction_filter = filter; +} + void rocksdb_options_set_comparator( rocksdb_options_t* opt, rocksdb_comparator_t* cmp) { diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 3a96bfbd1..c7d9f7782 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -230,6 +230,9 @@ extern const char* rocksdb_writebatch_data(rocksdb_writebatch_t*, size_t *size); extern rocksdb_options_t* rocksdb_options_create(); extern void rocksdb_options_destroy(rocksdb_options_t*); +extern void rocksdb_options_set_compaction_filter( + rocksdb_options_t*, + rocksdb_compactionfilter_t*); extern void rocksdb_options_set_comparator( rocksdb_options_t*, rocksdb_comparator_t*); From 5363eb8ad49dfec67203b1389ecaf17d398b4430 Mon Sep 17 00:00:00 2001 From: Evan Shaw Date: Thu, 19 Jun 2014 15:45:26 +1200 Subject: [PATCH 3/3] Add a test for using compaction filters via the C API --- db/c_test.c | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/db/c_test.c b/db/c_test.c index 8ebce9085..89380a08b 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -154,6 +154,28 @@ static unsigned char FilterKeyMatch( return fake_filter_result; } +// Custom compaction filter +static void CFilterDestroy(void* arg) {} +static const char* CFilterName(void* arg) { return "foo"; } +static unsigned char CFilterFilter(void* arg, int level, const char* key, + size_t key_length, + const char* existing_value, + size_t value_length, char** new_value, + size_t* new_value_length, + unsigned char* value_changed) { + if (key_length == 3) { + if (memcmp(key, "bar", key_length) == 0) { + return 1; + } else if (memcmp(key, "baz", key_length) == 0) { + *value_changed = 1; + *new_value = "newbazvalue"; + *new_value_length = 11; + return 0; + } + } + return 0; +} + // Custom merge operator static void MergeOperatorDestroy(void* arg) { } static const char* MergeOperatorName(void* arg) { @@ -407,6 +429,37 @@ int main(int argc, char** argv) { rocksdb_filterpolicy_destroy(policy); } + StartPhase("compaction_filter"); + { + rocksdb_compactionfilter_t* cfilter; + cfilter = rocksdb_compactionfilter_create(NULL, CFilterDestroy, + CFilterFilter, CFilterName); + // Create new database + rocksdb_close(db); + rocksdb_destroy_db(options, dbname, &err); + rocksdb_options_set_compaction_filter(options, cfilter); + db = rocksdb_open(options, dbname, &err); + CheckNoError(err); + rocksdb_put(db, woptions, "foo", 3, "foovalue", 8, &err); + CheckNoError(err); + CheckGet(db, roptions, "foo", "foovalue"); + rocksdb_put(db, woptions, "bar", 3, "barvalue", 8, &err); + CheckNoError(err); + CheckGet(db, roptions, "bar", "barvalue"); + rocksdb_put(db, woptions, "baz", 3, "bazvalue", 8, &err); + CheckNoError(err); + CheckGet(db, roptions, "baz", "bazvalue"); + + // Force compaction + rocksdb_compact_range(db, NULL, 0, NULL, 0); + // should have filtered bar, but not foo + CheckGet(db, roptions, "foo", "foovalue"); + CheckGet(db, roptions, "bar", NULL); + CheckGet(db, roptions, "baz", "newbazvalue"); + + rocksdb_compactionfilter_destroy(cfilter); + } + StartPhase("merge_operator"); { rocksdb_mergeoperator_t* merge_operator;