diff --git a/db/c.cc b/db/c.cc index c28dd0274..f65285f2c 100644 --- a/db/c.cc +++ b/db/c.cc @@ -35,6 +35,7 @@ using rocksdb::ColumnFamilyDescriptor; using rocksdb::ColumnFamilyHandle; using rocksdb::ColumnFamilyOptions; using rocksdb::CompactionFilter; +using rocksdb::CompactionFilterFactory; using rocksdb::Comparator; using rocksdb::CompressionType; using rocksdb::DB; @@ -84,6 +85,10 @@ struct rocksdb_cache_t { shared_ptr rep; }; struct rocksdb_livefiles_t { std::vector rep; }; struct rocksdb_column_family_handle_t { ColumnFamilyHandle* rep; }; +struct rocksdb_compactionfiltercontext_t { + CompactionFilter::Context rep; +}; + struct rocksdb_compactionfilter_t : public CompactionFilter { void* state_; void (*destructor_)(void*); @@ -127,6 +132,26 @@ struct rocksdb_compactionfilter_t : public CompactionFilter { } }; +struct rocksdb_compactionfilterfactory_t : public CompactionFilterFactory { + void* state_; + void (*destructor_)(void*); + rocksdb_compactionfilter_t* (*create_compaction_filter_)( + void*, rocksdb_compactionfiltercontext_t* context); + const char* (*name_)(void*); + + virtual ~rocksdb_compactionfilterfactory_t() { (*destructor_)(state_); } + + virtual std::unique_ptr CreateCompactionFilter( + const CompactionFilter::Context& context) { + rocksdb_compactionfiltercontext_t ccontext; + ccontext.rep = context; + CompactionFilter* cf = (*create_compaction_filter_)(state_, &ccontext); + return std::unique_ptr(cf); + } + + virtual const char* Name() const { return (*name_)(state_); } +}; + struct rocksdb_comparator_t : public Comparator { void* state_; void (*destructor_)(void*); @@ -931,6 +956,12 @@ void rocksdb_options_set_compaction_filter( opt->rep.compaction_filter = filter; } +void rocksdb_options_set_compaction_filter_factory( + rocksdb_options_t* opt, rocksdb_compactionfilterfactory_t* factory) { + opt->rep.compaction_filter_factory = + std::shared_ptr(factory); +} + void rocksdb_options_set_comparator( rocksdb_options_t* opt, rocksdb_comparator_t* cmp) { @@ -1445,6 +1476,35 @@ void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t* filter) { delete filter; } +unsigned char rocksdb_compactionfiltercontext_is_full_compaction( + rocksdb_compactionfiltercontext_t* context) { + return context->rep.is_full_compaction; +} + +unsigned char rocksdb_compactionfiltercontext_is_manual_compaction( + rocksdb_compactionfiltercontext_t* context) { + return context->rep.is_manual_compaction; +} + +rocksdb_compactionfilterfactory_t* rocksdb_compactionfilterfactory_create( + void* state, void (*destructor)(void*), + rocksdb_compactionfilter_t* (*create_compaction_filter)( + void*, rocksdb_compactionfiltercontext_t* context), + const char* (*name)(void*)) { + rocksdb_compactionfilterfactory_t* result = + new rocksdb_compactionfilterfactory_t; + result->state_ = state; + result->destructor_ = destructor; + result->create_compaction_filter_ = create_compaction_filter; + result->name_ = name; + return result; +} + +void rocksdb_compactionfilterfactory_destroy( + rocksdb_compactionfilterfactory_t* factory) { + delete factory; +} + rocksdb_comparator_t* rocksdb_comparator_create( void* state, void (*destructor)(void*), diff --git a/db/c_test.c b/db/c_test.c index e7e6d9208..1cc0a65cf 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -192,6 +192,39 @@ static unsigned char CFilterFilter(void* arg, int level, const char* key, return 0; } +static void CFilterFactoryDestroy(void* arg) {} +static const char* CFilterFactoryName(void* arg) { return "foo"; } +static rocksdb_compactionfilter_t* CFilterCreate( + void* arg, rocksdb_compactionfiltercontext_t* context) { + return rocksdb_compactionfilter_create(NULL, CFilterDestroy, CFilterFilter, + CFilterName); +} + +static rocksdb_t* CheckCompaction(rocksdb_t* db, rocksdb_options_t* options, + rocksdb_readoptions_t* roptions, + rocksdb_writeoptions_t* woptions) { + char* err = NULL; + db = rocksdb_open(options, dbname, &err); + CheckNoError(err); + rocksdb_put(db, woptions, "foo", 3, "foovalue", 8, &err); + CheckNoError(err); + CheckGet(db, roptions, "foo", "foovalue"); + rocksdb_put(db, woptions, "bar", 3, "barvalue", 8, &err); + CheckNoError(err); + CheckGet(db, roptions, "bar", "barvalue"); + rocksdb_put(db, woptions, "baz", 3, "bazvalue", 8, &err); + CheckNoError(err); + CheckGet(db, roptions, "baz", "bazvalue"); + + // Force compaction + rocksdb_compact_range(db, NULL, 0, NULL, 0); + // should have filtered bar, but not foo + CheckGet(db, roptions, "foo", "foovalue"); + CheckGet(db, roptions, "bar", NULL); + CheckGet(db, roptions, "baz", "newbazvalue"); + return db; +} + // Custom merge operator static void MergeOperatorDestroy(void* arg) { } static const char* MergeOperatorName(void* arg) { @@ -465,6 +498,8 @@ int main(int argc, char** argv) { StartPhase("compaction_filter"); { + rocksdb_options_t* options = rocksdb_options_create(); + rocksdb_options_set_create_if_missing(options, 1); rocksdb_compactionfilter_t* cfilter; cfilter = rocksdb_compactionfilter_create(NULL, CFilterDestroy, CFilterFilter, CFilterName); @@ -472,26 +507,28 @@ int main(int argc, char** argv) { rocksdb_close(db); rocksdb_destroy_db(options, dbname, &err); rocksdb_options_set_compaction_filter(options, cfilter); - db = rocksdb_open(options, dbname, &err); - CheckNoError(err); - rocksdb_put(db, woptions, "foo", 3, "foovalue", 8, &err); - CheckNoError(err); - CheckGet(db, roptions, "foo", "foovalue"); - rocksdb_put(db, woptions, "bar", 3, "barvalue", 8, &err); - CheckNoError(err); - CheckGet(db, roptions, "bar", "barvalue"); - rocksdb_put(db, woptions, "baz", 3, "bazvalue", 8, &err); - CheckNoError(err); - CheckGet(db, roptions, "baz", "bazvalue"); - - // Force compaction - rocksdb_compact_range(db, NULL, 0, NULL, 0); - // should have filtered bar, but not foo - CheckGet(db, roptions, "foo", "foovalue"); - CheckGet(db, roptions, "bar", NULL); - CheckGet(db, roptions, "baz", "newbazvalue"); + db = CheckCompaction(db, options, roptions, woptions); + rocksdb_options_set_compaction_filter(options, NULL); rocksdb_compactionfilter_destroy(cfilter); + rocksdb_options_destroy(options); + } + + StartPhase("compaction_filter_factory"); + { + rocksdb_options_t* options = rocksdb_options_create(); + rocksdb_options_set_create_if_missing(options, 1); + rocksdb_compactionfilterfactory_t* factory; + factory = rocksdb_compactionfilterfactory_create( + NULL, CFilterFactoryDestroy, CFilterCreate, CFilterFactoryName); + // Create new database + rocksdb_close(db); + rocksdb_destroy_db(options, dbname, &err); + rocksdb_options_set_compaction_filter_factory(options, factory); + db = CheckCompaction(db, options, roptions, woptions); + + rocksdb_options_set_compaction_filter_factory(options, NULL); + rocksdb_options_destroy(options); } StartPhase("merge_operator"); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 09790f6c0..65992e967 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -57,6 +57,10 @@ extern "C" { typedef struct rocksdb_t rocksdb_t; typedef struct rocksdb_cache_t rocksdb_cache_t; typedef struct rocksdb_compactionfilter_t rocksdb_compactionfilter_t; +typedef struct rocksdb_compactionfiltercontext_t + rocksdb_compactionfiltercontext_t; +typedef struct rocksdb_compactionfilterfactory_t + rocksdb_compactionfilterfactory_t; typedef struct rocksdb_comparator_t rocksdb_comparator_t; typedef struct rocksdb_env_t rocksdb_env_t; typedef struct rocksdb_filelock_t rocksdb_filelock_t; @@ -343,6 +347,8 @@ extern void rocksdb_options_destroy(rocksdb_options_t*); extern void rocksdb_options_set_compaction_filter( rocksdb_options_t*, rocksdb_compactionfilter_t*); +extern void rocksdb_options_set_compaction_filter_factory( + rocksdb_options_t*, rocksdb_compactionfilterfactory_t*); extern void rocksdb_options_set_comparator( rocksdb_options_t*, rocksdb_comparator_t*); @@ -531,6 +537,25 @@ extern rocksdb_compactionfilter_t* rocksdb_compactionfilter_create( const char* (*name)(void*)); extern void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t*); +/* Compaction Filter Context */ + +extern unsigned char rocksdb_compactionfiltercontext_is_full_compaction( + rocksdb_compactionfiltercontext_t* context); + +extern unsigned char rocksdb_compactionfiltercontext_is_manual_compaction( + rocksdb_compactionfiltercontext_t* context); + +/* Compaction Filter Factory */ + +extern rocksdb_compactionfilterfactory_t* + rocksdb_compactionfilterfactory_create( + void* state, void (*destructor)(void*), + rocksdb_compactionfilter_t* (*create_compaction_filter)( + void*, rocksdb_compactionfiltercontext_t* context), + const char* (*name)(void*)); +extern void rocksdb_compactionfilterfactory_destroy( + rocksdb_compactionfilterfactory_t*); + /* Comparator */ extern rocksdb_comparator_t* rocksdb_comparator_create( diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 59b050923..9c24fc501 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -86,7 +86,7 @@ class CompactionFilterV2 { // // Each entry in the return vector indicates if the corresponding kv should // be preserved in the output of this compaction run. The application can - // inspect the exisitng values of the keys and make decision based on it. + // inspect the existing values of the keys and make decision based on it. // // When a value is to be preserved, the application has the option // to modify the entry in existing_values and pass it back through an entry @@ -108,7 +108,7 @@ class CompactionFilterV2 { }; // Each compaction will create a new CompactionFilter allowing the -// application to know about different campactions +// application to know about different compactions class CompactionFilterFactory { public: virtual ~CompactionFilterFactory() { } @@ -120,7 +120,7 @@ class CompactionFilterFactory { virtual const char* Name() const = 0; }; -// Default implementaion of CompactionFilterFactory which does not +// Default implementation of CompactionFilterFactory which does not // return any filter class DefaultCompactionFilterFactory : public CompactionFilterFactory { public: @@ -175,7 +175,7 @@ class CompactionFilterFactoryV2 { const SliceTransform* prefix_extractor_; }; -// Default implementaion of CompactionFilterFactoryV2 which does not +// Default implementation of CompactionFilterFactoryV2 which does not // return any filter class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 { public: