@ -2383,6 +2383,97 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ ( NumTableFilesAtLevel ( 1 ) , 1 ) ;
ASSERT_EQ ( NumTableFilesAtLevel ( 1 ) , 1 ) ;
}
}
// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count ;
static std : : string NEW_VALUE = " NewValue " ;
class KeepFilter : public CompactionFilter {
public :
virtual bool Filter ( int level , const Slice & key , const Slice & value ,
std : : string * new_value , bool * value_changed ) const
override {
cfilter_count + + ;
return false ;
}
virtual const char * Name ( ) const override { return " KeepFilter " ; }
} ;
class DeleteFilter : public CompactionFilter {
public :
virtual bool Filter ( int level , const Slice & key , const Slice & value ,
std : : string * new_value , bool * value_changed ) const
override {
cfilter_count + + ;
return true ;
}
virtual const char * Name ( ) const override { return " DeleteFilter " ; }
} ;
class ChangeFilter : public CompactionFilter {
public :
explicit ChangeFilter ( ) { }
virtual bool Filter ( int level , const Slice & key , const Slice & value ,
std : : string * new_value , bool * value_changed ) const
override {
assert ( new_value ! = nullptr ) ;
* new_value = NEW_VALUE ;
* value_changed = true ;
return false ;
}
virtual const char * Name ( ) const override { return " ChangeFilter " ; }
} ;
class KeepFilterFactory : public CompactionFilterFactory {
public :
explicit KeepFilterFactory ( bool check_context = false )
: check_context_ ( check_context ) { }
virtual std : : unique_ptr < CompactionFilter > CreateCompactionFilter (
const CompactionFilter : : Context & context ) override {
if ( check_context_ ) {
ASSERT_EQ ( expect_full_compaction_ . load ( ) , context . is_full_compaction ) ;
ASSERT_EQ ( expect_manual_compaction_ . load ( ) , context . is_manual_compaction ) ;
}
return std : : unique_ptr < CompactionFilter > ( new KeepFilter ( ) ) ;
}
virtual const char * Name ( ) const override { return " KeepFilterFactory " ; }
bool check_context_ ;
std : : atomic_bool expect_full_compaction_ ;
std : : atomic_bool expect_manual_compaction_ ;
} ;
class DeleteFilterFactory : public CompactionFilterFactory {
public :
virtual std : : unique_ptr < CompactionFilter > CreateCompactionFilter (
const CompactionFilter : : Context & context ) override {
if ( context . is_manual_compaction ) {
return std : : unique_ptr < CompactionFilter > ( new DeleteFilter ( ) ) ;
} else {
return std : : unique_ptr < CompactionFilter > ( nullptr ) ;
}
}
virtual const char * Name ( ) const override { return " DeleteFilterFactory " ; }
} ;
class ChangeFilterFactory : public CompactionFilterFactory {
public :
explicit ChangeFilterFactory ( ) { }
virtual std : : unique_ptr < CompactionFilter > CreateCompactionFilter (
const CompactionFilter : : Context & context ) override {
return std : : unique_ptr < CompactionFilter > ( new ChangeFilter ( ) ) ;
}
virtual const char * Name ( ) const override { return " ChangeFilterFactory " ; }
} ;
// TODO(kailiu) The tests on UniversalCompaction has some issues:
// TODO(kailiu) The tests on UniversalCompaction has some issues:
// 1. A lot of magic numbers ("11" or "12").
// 1. A lot of magic numbers ("11" or "12").
// 2. Made assumption on the memtable flush conidtions, which may change from
// 2. Made assumption on the memtable flush conidtions, which may change from
@ -2393,11 +2484,16 @@ TEST(DBTest, UniversalCompactionTrigger) {
options . write_buffer_size = 100 < < 10 ; //100KB
options . write_buffer_size = 100 < < 10 ; //100KB
// trigger compaction if there are >= 4 files
// trigger compaction if there are >= 4 files
options . level0_file_num_compaction_trigger = 4 ;
options . level0_file_num_compaction_trigger = 4 ;
KeepFilterFactory * filter = new KeepFilterFactory ( true ) ;
filter - > expect_manual_compaction_ . store ( false ) ;
options . compaction_filter_factory . reset ( filter ) ;
Reopen ( & options ) ;
Reopen ( & options ) ;
Random rnd ( 301 ) ;
Random rnd ( 301 ) ;
int key_idx = 0 ;
int key_idx = 0 ;
filter - > expect_full_compaction_ . store ( true ) ;
// Stage 1:
// Stage 1:
// Generate a set of files at level 0, but don't trigger level-0
// Generate a set of files at level 0, but don't trigger level-0
// compaction.
// compaction.
@ -2415,6 +2511,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Generate one more file at level-0, which should trigger level-0
// Generate one more file at level-0, which should trigger level-0
// compaction.
// compaction.
filter - > expect_full_compaction_ . store ( false ) ;
for ( int i = 0 ; i < 11 ; i + + ) {
for ( int i = 0 ; i < 11 ; i + + ) {
ASSERT_OK ( Put ( Key ( key_idx ) , RandomString ( & rnd , 10000 ) ) ) ;
ASSERT_OK ( Put ( Key ( key_idx ) , RandomString ( & rnd , 10000 ) ) ) ;
key_idx + + ;
key_idx + + ;
@ -2508,6 +2605,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Stage 5:
// Stage 5:
// Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
// Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
// a new file of size 1.
// a new file of size 1.
filter - > expect_full_compaction_ . store ( true ) ;
for ( int i = 0 ; i < 11 ; i + + ) {
for ( int i = 0 ; i < 11 ; i + + ) {
ASSERT_OK ( Put ( Key ( key_idx ) , RandomString ( & rnd , 10000 ) ) ) ;
ASSERT_OK ( Put ( Key ( key_idx ) , RandomString ( & rnd , 10000 ) ) ) ;
key_idx + + ;
key_idx + + ;
@ -3230,100 +3328,6 @@ TEST(DBTest, InPlaceUpdateCallbackNoAction) {
} while ( ChangeCompactOptions ( ) ) ;
} while ( ChangeCompactOptions ( ) ) ;
}
}
// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count ;
static std : : string NEW_VALUE = " NewValue " ;
class KeepFilter : public CompactionFilter {
public :
virtual bool Filter ( int level , const Slice & key ,
const Slice & value , std : : string * new_value ,
bool * value_changed ) const override {
cfilter_count + + ;
return false ;
}
virtual const char * Name ( ) const override {
return " KeepFilter " ;
}
} ;
class DeleteFilter : public CompactionFilter {
public :
virtual bool Filter ( int level , const Slice & key ,
const Slice & value , std : : string * new_value ,
bool * value_changed ) const override {
cfilter_count + + ;
return true ;
}
virtual const char * Name ( ) const override {
return " DeleteFilter " ;
}
} ;
class ChangeFilter : public CompactionFilter {
public :
explicit ChangeFilter ( ) { }
virtual bool Filter ( int level , const Slice & key ,
const Slice & value , std : : string * new_value ,
bool * value_changed ) const override {
assert ( new_value ! = nullptr ) ;
* new_value = NEW_VALUE ;
* value_changed = true ;
return false ;
}
virtual const char * Name ( ) const override {
return " ChangeFilter " ;
}
} ;
class KeepFilterFactory : public CompactionFilterFactory {
public :
virtual std : : unique_ptr < CompactionFilter >
CreateCompactionFilter ( const CompactionFilter : : Context & context ) override {
return std : : unique_ptr < CompactionFilter > ( new KeepFilter ( ) ) ;
}
virtual const char * Name ( ) const override {
return " KeepFilterFactory " ;
}
} ;
class DeleteFilterFactory : public CompactionFilterFactory {
public :
virtual std : : unique_ptr < CompactionFilter >
CreateCompactionFilter ( const CompactionFilter : : Context & context ) override {
if ( context . is_manual_compaction ) {
return std : : unique_ptr < CompactionFilter > ( new DeleteFilter ( ) ) ;
} else {
return std : : unique_ptr < CompactionFilter > ( nullptr ) ;
}
}
virtual const char * Name ( ) const override {
return " DeleteFilterFactory " ;
}
} ;
class ChangeFilterFactory : public CompactionFilterFactory {
public :
explicit ChangeFilterFactory ( ) { }
virtual std : : unique_ptr < CompactionFilter >
CreateCompactionFilter ( const CompactionFilter : : Context & context ) override {
return std : : unique_ptr < CompactionFilter > ( new ChangeFilter ( ) ) ;
}
virtual const char * Name ( ) const override {
return " ChangeFilterFactory " ;
}
} ;
TEST ( DBTest , CompactionFilter ) {
TEST ( DBTest , CompactionFilter ) {
Options options = CurrentOptions ( ) ;
Options options = CurrentOptions ( ) ;
options . num_levels = 3 ;
options . num_levels = 3 ;
@ -3512,6 +3516,60 @@ TEST(DBTest, CompactionFilterWithValueChange) {
} while ( ChangeCompactOptions ( ) ) ;
} while ( ChangeCompactOptions ( ) ) ;
}
}
TEST ( DBTest , CompactionFilterContextManual ) {
KeepFilterFactory * filter = new KeepFilterFactory ( ) ;
Options options = CurrentOptions ( ) ;
options . compaction_style = kCompactionStyleUniversal ;
options . compaction_filter_factory . reset ( filter ) ;
options . compression = kNoCompression ;
options . level0_file_num_compaction_trigger = 8 ;
Reopen ( & options ) ;
int num_keys_per_file = 400 ;
for ( int j = 0 ; j < 3 ; j + + ) {
// Write several keys.
const std : : string value ( 10 , ' x ' ) ;
for ( int i = 0 ; i < num_keys_per_file ; i + + ) {
char key [ 100 ] ;
snprintf ( key , sizeof ( key ) , " B%08d%02d " , i , j ) ;
Put ( key , value ) ;
}
dbfull ( ) - > TEST_FlushMemTable ( ) ;
// Make sure next file is much smaller so automatic compaction will not
// be triggered.
num_keys_per_file / = 2 ;
}
// Force a manual compaction
cfilter_count = 0 ;
filter - > expect_manual_compaction_ . store ( true ) ;
filter - > expect_full_compaction_ . store ( false ) ; // Manual compaction always
// set this flag.
dbfull ( ) - > CompactRange ( nullptr , nullptr ) ;
ASSERT_EQ ( cfilter_count , 700 ) ;
ASSERT_EQ ( NumTableFilesAtLevel ( 0 ) , 1 ) ;
// Verify total number of keys is correct after manual compaction.
int count = 0 ;
int total = 0 ;
Iterator * iter = dbfull ( ) - > TEST_NewInternalIterator ( ) ;
iter - > SeekToFirst ( ) ;
ASSERT_OK ( iter - > status ( ) ) ;
while ( iter - > Valid ( ) ) {
ParsedInternalKey ikey ( Slice ( ) , 0 , kTypeValue ) ;
ikey . sequence = - 1 ;
ASSERT_EQ ( ParseInternalKey ( iter - > key ( ) , & ikey ) , true ) ;
total + + ;
if ( ikey . sequence ! = 0 ) {
count + + ;
}
iter - > Next ( ) ;
}
ASSERT_EQ ( total , 700 ) ;
ASSERT_EQ ( count , 1 ) ;
delete iter ;
}
TEST ( DBTest , SparseMerge ) {
TEST ( DBTest , SparseMerge ) {
do {
do {
Options options = CurrentOptions ( ) ;
Options options = CurrentOptions ( ) ;