@ -3284,6 +3284,22 @@ class DeleteFilter : public CompactionFilter {
virtual const char * Name ( ) const override { return " DeleteFilter " ; }
} ;
class ConditionalFilter : public CompactionFilter {
public :
explicit ConditionalFilter ( const std : : string * filtered_value )
: filtered_value_ ( filtered_value ) { }
virtual bool Filter ( int level , const Slice & key , const Slice & value ,
std : : string * new_value ,
bool * value_changed ) const override {
return value . ToString ( ) = = * filtered_value_ ;
}
virtual const char * Name ( ) const override { return " ConditionalFilter " ; }
private :
const std : : string * filtered_value_ ;
} ;
class ChangeFilter : public CompactionFilter {
public :
explicit ChangeFilter ( ) { }
@ -3334,6 +3350,25 @@ class DeleteFilterFactory : public CompactionFilterFactory {
virtual const char * Name ( ) const override { return " DeleteFilterFactory " ; }
} ;
class ConditionalFilterFactory : public CompactionFilterFactory {
public :
explicit ConditionalFilterFactory ( const Slice & filtered_value )
: filtered_value_ ( filtered_value . ToString ( ) ) { }
virtual std : : unique_ptr < CompactionFilter > CreateCompactionFilter (
const CompactionFilter : : Context & context ) override {
return std : : unique_ptr < CompactionFilter > (
new ConditionalFilter ( & filtered_value_ ) ) ;
}
virtual const char * Name ( ) const override {
return " ConditionalFilterFactory " ;
}
private :
std : : string filtered_value_ ;
} ;
class ChangeFilterFactory : public CompactionFilterFactory {
public :
explicit ChangeFilterFactory ( ) { }
@ -4721,6 +4756,75 @@ TEST(DBTest, CompactionFilterWithValueChange) {
} while ( ChangeCompactOptions ( ) ) ;
}
TEST ( DBTest , CompactionFilterWithMergeOperator ) {
std : : string one , two , three , four ;
PutFixed64 ( & one , 1 ) ;
PutFixed64 ( & two , 2 ) ;
PutFixed64 ( & three , 3 ) ;
PutFixed64 ( & four , 4 ) ;
Options options ;
options = CurrentOptions ( options ) ;
options . create_if_missing = true ;
options . merge_operator = MergeOperators : : CreateUInt64AddOperator ( ) ;
options . num_levels = 3 ;
options . max_mem_compaction_level = 0 ;
// Filter out keys with value is 2.
options . compaction_filter_factory =
std : : make_shared < ConditionalFilterFactory > ( two ) ;
DestroyAndReopen ( options ) ;
// In the same compaction, a value type needs to be deleted based on
// compaction filter, and there is a merge type for the key. compaction
// filter result is ignored.
ASSERT_OK ( db_ - > Put ( WriteOptions ( ) , " foo " , two ) ) ;
ASSERT_OK ( Flush ( ) ) ;
ASSERT_OK ( db_ - > Merge ( WriteOptions ( ) , " foo " , one ) ) ;
ASSERT_OK ( Flush ( ) ) ;
std : : string newvalue = Get ( " foo " ) ;
ASSERT_EQ ( newvalue , three ) ;
dbfull ( ) - > CompactRange ( nullptr , nullptr ) ;
newvalue = Get ( " foo " ) ;
ASSERT_EQ ( newvalue , three ) ;
// value key can be deleted based on compaction filter, leaving only
// merge keys.
ASSERT_OK ( db_ - > Put ( WriteOptions ( ) , " bar " , two ) ) ;
ASSERT_OK ( Flush ( ) ) ;
dbfull ( ) - > CompactRange ( nullptr , nullptr ) ;
newvalue = Get ( " bar " ) ;
ASSERT_EQ ( " NOT_FOUND " , newvalue ) ;
ASSERT_OK ( db_ - > Merge ( WriteOptions ( ) , " bar " , two ) ) ;
ASSERT_OK ( Flush ( ) ) ;
dbfull ( ) - > CompactRange ( nullptr , nullptr ) ;
newvalue = Get ( " bar " ) ;
ASSERT_EQ ( two , two ) ;
// Compaction filter never applies to merge keys.
ASSERT_OK ( db_ - > Put ( WriteOptions ( ) , " foobar " , one ) ) ;
ASSERT_OK ( Flush ( ) ) ;
ASSERT_OK ( db_ - > Merge ( WriteOptions ( ) , " foobar " , two ) ) ;
ASSERT_OK ( Flush ( ) ) ;
newvalue = Get ( " foobar " ) ;
ASSERT_EQ ( newvalue , three ) ;
dbfull ( ) - > CompactRange ( nullptr , nullptr ) ;
newvalue = Get ( " foobar " ) ;
ASSERT_EQ ( newvalue , three ) ;
// In the same compaction, both of value type and merge type keys need to be
// deleted based on compaction filter, and there is a merge type for the key.
// For both keys, compaction filter results are ignored.
ASSERT_OK ( db_ - > Put ( WriteOptions ( ) , " barfoo " , two ) ) ;
ASSERT_OK ( Flush ( ) ) ;
ASSERT_OK ( db_ - > Merge ( WriteOptions ( ) , " barfoo " , two ) ) ;
ASSERT_OK ( Flush ( ) ) ;
newvalue = Get ( " barfoo " ) ;
ASSERT_EQ ( newvalue , four ) ;
dbfull ( ) - > CompactRange ( nullptr , nullptr ) ;
newvalue = Get ( " barfoo " ) ;
ASSERT_EQ ( newvalue , four ) ;
}
TEST ( DBTest , CompactionFilterContextManual ) {
KeepFilterFactory * filter = new KeepFilterFactory ( ) ;