Merge pull request #191 from edsrzf/c-api-compaction-filter-factory

C API: Add support for compaction filter factories (v1)
main
Igor Canadi 10 years ago
commit 8f6e9ab209
  1. 60
      db/c.cc
  2. 73
      db/c_test.c
  3. 25
      include/rocksdb/c.h
  4. 8
      include/rocksdb/compaction_filter.h

@ -35,6 +35,7 @@ using rocksdb::ColumnFamilyDescriptor;
using rocksdb::ColumnFamilyHandle;
using rocksdb::ColumnFamilyOptions;
using rocksdb::CompactionFilter;
using rocksdb::CompactionFilterFactory;
using rocksdb::Comparator;
using rocksdb::CompressionType;
using rocksdb::DB;
@ -84,6 +85,10 @@ struct rocksdb_cache_t { shared_ptr<Cache> rep; };
struct rocksdb_livefiles_t { std::vector<LiveFileMetaData> rep; };
struct rocksdb_column_family_handle_t { ColumnFamilyHandle* rep; };
struct rocksdb_compactionfiltercontext_t {
CompactionFilter::Context rep;
};
struct rocksdb_compactionfilter_t : public CompactionFilter {
void* state_;
void (*destructor_)(void*);
@ -127,6 +132,26 @@ struct rocksdb_compactionfilter_t : public CompactionFilter {
}
};
struct rocksdb_compactionfilterfactory_t : public CompactionFilterFactory {
void* state_;
void (*destructor_)(void*);
rocksdb_compactionfilter_t* (*create_compaction_filter_)(
void*, rocksdb_compactionfiltercontext_t* context);
const char* (*name_)(void*);
virtual ~rocksdb_compactionfilterfactory_t() { (*destructor_)(state_); }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
rocksdb_compactionfiltercontext_t ccontext;
ccontext.rep = context;
CompactionFilter* cf = (*create_compaction_filter_)(state_, &ccontext);
return std::unique_ptr<CompactionFilter>(cf);
}
virtual const char* Name() const { return (*name_)(state_); }
};
struct rocksdb_comparator_t : public Comparator {
void* state_;
void (*destructor_)(void*);
@ -931,6 +956,12 @@ void rocksdb_options_set_compaction_filter(
opt->rep.compaction_filter = filter;
}
void rocksdb_options_set_compaction_filter_factory(
rocksdb_options_t* opt, rocksdb_compactionfilterfactory_t* factory) {
opt->rep.compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(factory);
}
void rocksdb_options_set_comparator(
rocksdb_options_t* opt,
rocksdb_comparator_t* cmp) {
@ -1445,6 +1476,35 @@ void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t* filter) {
delete filter;
}
unsigned char rocksdb_compactionfiltercontext_is_full_compaction(
rocksdb_compactionfiltercontext_t* context) {
return context->rep.is_full_compaction;
}
unsigned char rocksdb_compactionfiltercontext_is_manual_compaction(
rocksdb_compactionfiltercontext_t* context) {
return context->rep.is_manual_compaction;
}
rocksdb_compactionfilterfactory_t* rocksdb_compactionfilterfactory_create(
void* state, void (*destructor)(void*),
rocksdb_compactionfilter_t* (*create_compaction_filter)(
void*, rocksdb_compactionfiltercontext_t* context),
const char* (*name)(void*)) {
rocksdb_compactionfilterfactory_t* result =
new rocksdb_compactionfilterfactory_t;
result->state_ = state;
result->destructor_ = destructor;
result->create_compaction_filter_ = create_compaction_filter;
result->name_ = name;
return result;
}
void rocksdb_compactionfilterfactory_destroy(
rocksdb_compactionfilterfactory_t* factory) {
delete factory;
}
rocksdb_comparator_t* rocksdb_comparator_create(
void* state,
void (*destructor)(void*),

@ -192,6 +192,39 @@ static unsigned char CFilterFilter(void* arg, int level, const char* key,
return 0;
}
static void CFilterFactoryDestroy(void* arg) {}
static const char* CFilterFactoryName(void* arg) { return "foo"; }
static rocksdb_compactionfilter_t* CFilterCreate(
void* arg, rocksdb_compactionfiltercontext_t* context) {
return rocksdb_compactionfilter_create(NULL, CFilterDestroy, CFilterFilter,
CFilterName);
}
static rocksdb_t* CheckCompaction(rocksdb_t* db, rocksdb_options_t* options,
rocksdb_readoptions_t* roptions,
rocksdb_writeoptions_t* woptions) {
char* err = NULL;
db = rocksdb_open(options, dbname, &err);
CheckNoError(err);
rocksdb_put(db, woptions, "foo", 3, "foovalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "foo", "foovalue");
rocksdb_put(db, woptions, "bar", 3, "barvalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "bar", "barvalue");
rocksdb_put(db, woptions, "baz", 3, "bazvalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "baz", "bazvalue");
// Force compaction
rocksdb_compact_range(db, NULL, 0, NULL, 0);
// should have filtered bar, but not foo
CheckGet(db, roptions, "foo", "foovalue");
CheckGet(db, roptions, "bar", NULL);
CheckGet(db, roptions, "baz", "newbazvalue");
return db;
}
// Custom merge operator
static void MergeOperatorDestroy(void* arg) { }
static const char* MergeOperatorName(void* arg) {
@ -465,6 +498,8 @@ int main(int argc, char** argv) {
StartPhase("compaction_filter");
{
rocksdb_options_t* options = rocksdb_options_create();
rocksdb_options_set_create_if_missing(options, 1);
rocksdb_compactionfilter_t* cfilter;
cfilter = rocksdb_compactionfilter_create(NULL, CFilterDestroy,
CFilterFilter, CFilterName);
@ -472,26 +507,28 @@ int main(int argc, char** argv) {
rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &err);
rocksdb_options_set_compaction_filter(options, cfilter);
db = rocksdb_open(options, dbname, &err);
CheckNoError(err);
rocksdb_put(db, woptions, "foo", 3, "foovalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "foo", "foovalue");
rocksdb_put(db, woptions, "bar", 3, "barvalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "bar", "barvalue");
rocksdb_put(db, woptions, "baz", 3, "bazvalue", 8, &err);
CheckNoError(err);
CheckGet(db, roptions, "baz", "bazvalue");
// Force compaction
rocksdb_compact_range(db, NULL, 0, NULL, 0);
// should have filtered bar, but not foo
CheckGet(db, roptions, "foo", "foovalue");
CheckGet(db, roptions, "bar", NULL);
CheckGet(db, roptions, "baz", "newbazvalue");
db = CheckCompaction(db, options, roptions, woptions);
rocksdb_options_set_compaction_filter(options, NULL);
rocksdb_compactionfilter_destroy(cfilter);
rocksdb_options_destroy(options);
}
StartPhase("compaction_filter_factory");
{
rocksdb_options_t* options = rocksdb_options_create();
rocksdb_options_set_create_if_missing(options, 1);
rocksdb_compactionfilterfactory_t* factory;
factory = rocksdb_compactionfilterfactory_create(
NULL, CFilterFactoryDestroy, CFilterCreate, CFilterFactoryName);
// Create new database
rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &err);
rocksdb_options_set_compaction_filter_factory(options, factory);
db = CheckCompaction(db, options, roptions, woptions);
rocksdb_options_set_compaction_filter_factory(options, NULL);
rocksdb_options_destroy(options);
}
StartPhase("merge_operator");

@ -57,6 +57,10 @@ extern "C" {
typedef struct rocksdb_t rocksdb_t;
typedef struct rocksdb_cache_t rocksdb_cache_t;
typedef struct rocksdb_compactionfilter_t rocksdb_compactionfilter_t;
typedef struct rocksdb_compactionfiltercontext_t
rocksdb_compactionfiltercontext_t;
typedef struct rocksdb_compactionfilterfactory_t
rocksdb_compactionfilterfactory_t;
typedef struct rocksdb_comparator_t rocksdb_comparator_t;
typedef struct rocksdb_env_t rocksdb_env_t;
typedef struct rocksdb_filelock_t rocksdb_filelock_t;
@ -343,6 +347,8 @@ extern void rocksdb_options_destroy(rocksdb_options_t*);
extern void rocksdb_options_set_compaction_filter(
rocksdb_options_t*,
rocksdb_compactionfilter_t*);
extern void rocksdb_options_set_compaction_filter_factory(
rocksdb_options_t*, rocksdb_compactionfilterfactory_t*);
extern void rocksdb_options_set_comparator(
rocksdb_options_t*,
rocksdb_comparator_t*);
@ -531,6 +537,25 @@ extern rocksdb_compactionfilter_t* rocksdb_compactionfilter_create(
const char* (*name)(void*));
extern void rocksdb_compactionfilter_destroy(rocksdb_compactionfilter_t*);
/* Compaction Filter Context */
extern unsigned char rocksdb_compactionfiltercontext_is_full_compaction(
rocksdb_compactionfiltercontext_t* context);
extern unsigned char rocksdb_compactionfiltercontext_is_manual_compaction(
rocksdb_compactionfiltercontext_t* context);
/* Compaction Filter Factory */
extern rocksdb_compactionfilterfactory_t*
rocksdb_compactionfilterfactory_create(
void* state, void (*destructor)(void*),
rocksdb_compactionfilter_t* (*create_compaction_filter)(
void*, rocksdb_compactionfiltercontext_t* context),
const char* (*name)(void*));
extern void rocksdb_compactionfilterfactory_destroy(
rocksdb_compactionfilterfactory_t*);
/* Comparator */
extern rocksdb_comparator_t* rocksdb_comparator_create(

@ -86,7 +86,7 @@ class CompactionFilterV2 {
//
// Each entry in the return vector indicates if the corresponding kv should
// be preserved in the output of this compaction run. The application can
// inspect the exisitng values of the keys and make decision based on it.
// inspect the existing values of the keys and make decision based on it.
//
// When a value is to be preserved, the application has the option
// to modify the entry in existing_values and pass it back through an entry
@ -108,7 +108,7 @@ class CompactionFilterV2 {
};
// Each compaction will create a new CompactionFilter allowing the
// application to know about different campactions
// application to know about different compactions
class CompactionFilterFactory {
public:
virtual ~CompactionFilterFactory() { }
@ -120,7 +120,7 @@ class CompactionFilterFactory {
virtual const char* Name() const = 0;
};
// Default implementaion of CompactionFilterFactory which does not
// Default implementation of CompactionFilterFactory which does not
// return any filter
class DefaultCompactionFilterFactory : public CompactionFilterFactory {
public:
@ -175,7 +175,7 @@ class CompactionFilterFactoryV2 {
const SliceTransform* prefix_extractor_;
};
// Default implementaion of CompactionFilterFactoryV2 which does not
// Default implementation of CompactionFilterFactoryV2 which does not
// return any filter
class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:

Loading…
Cancel
Save