@ -92,6 +92,294 @@ TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
VerifyDBInternal ( { { " k1 " , " v1 " } , { " k2 " , " corrupted " } , { " k2 " , " v2 " } } ) ;
}
class MergeOperatorPinningTest : public DBMergeOperatorTest ,
public testing : : WithParamInterface < bool > {
public :
MergeOperatorPinningTest ( ) { disable_block_cache_ = GetParam ( ) ; }
bool disable_block_cache_ ;
} ;
INSTANTIATE_TEST_CASE_P ( MergeOperatorPinningTest , MergeOperatorPinningTest ,
: : testing : : Bool ( ) ) ;
# ifndef ROCKSDB_LITE
TEST_P ( MergeOperatorPinningTest , OperandsMultiBlocks ) {
Options options = CurrentOptions ( ) ;
BlockBasedTableOptions table_options ;
table_options . block_size = 1 ; // every block will contain one entry
table_options . no_block_cache = disable_block_cache_ ;
options . table_factory . reset ( NewBlockBasedTableFactory ( table_options ) ) ;
options . merge_operator = MergeOperators : : CreateStringAppendTESTOperator ( ) ;
options . level0_slowdown_writes_trigger = ( 1 < < 30 ) ;
options . level0_stop_writes_trigger = ( 1 < < 30 ) ;
options . disable_auto_compactions = true ;
DestroyAndReopen ( options ) ;
const int kKeysPerFile = 10 ;
const int kOperandsPerKeyPerFile = 7 ;
const int kOperandSize = 100 ;
// Filse to write in L0 before compacting to lower level
const int kFilesPerLevel = 3 ;
Random rnd ( 301 ) ;
std : : map < std : : string , std : : string > true_data ;
int batch_num = 1 ;
int lvl_to_fill = 4 ;
int key_id = 0 ;
while ( true ) {
for ( int j = 0 ; j < kKeysPerFile ; j + + ) {
std : : string key = Key ( key_id % 35 ) ;
key_id + + ;
for ( int k = 0 ; k < kOperandsPerKeyPerFile ; k + + ) {
std : : string val = RandomString ( & rnd , kOperandSize ) ;
ASSERT_OK ( db_ - > Merge ( WriteOptions ( ) , key , val ) ) ;
if ( true_data [ key ] . size ( ) = = 0 ) {
true_data [ key ] = val ;
} else {
true_data [ key ] + = " , " + val ;
}
}
}
if ( lvl_to_fill = = - 1 ) {
// Keep last batch in memtable and stop
break ;
}
ASSERT_OK ( Flush ( ) ) ;
if ( batch_num % kFilesPerLevel = = 0 ) {
if ( lvl_to_fill ! = 0 ) {
MoveFilesToLevel ( lvl_to_fill ) ;
}
lvl_to_fill - - ;
}
batch_num + + ;
}
// 3 L0 files
// 1 L1 file
// 3 L2 files
// 1 L3 file
// 3 L4 Files
ASSERT_EQ ( FilesPerLevel ( ) , " 3,1,3,1,3 " ) ;
VerifyDBFromMap ( true_data ) ;
}
TEST_P ( MergeOperatorPinningTest , Randomized ) {
do {
Options options = CurrentOptions ( ) ;
options . merge_operator = MergeOperators : : CreateMaxOperator ( ) ;
BlockBasedTableOptions table_options ;
table_options . no_block_cache = disable_block_cache_ ;
options . table_factory . reset ( NewBlockBasedTableFactory ( table_options ) ) ;
DestroyAndReopen ( options ) ;
Random rnd ( 301 ) ;
std : : map < std : : string , std : : string > true_data ;
const int kTotalMerges = 10000 ;
// Every key gets ~10 operands
const int kKeyRange = kTotalMerges / 10 ;
const int kOperandSize = 20 ;
const int kNumPutBefore = kKeyRange / 10 ; // 10% value
const int kNumPutAfter = kKeyRange / 10 ; // 10% overwrite
const int kNumDelete = kKeyRange / 10 ; // 10% delete
// kNumPutBefore keys will have base values
for ( int i = 0 ; i < kNumPutBefore ; i + + ) {
std : : string key = Key ( rnd . Next ( ) % kKeyRange ) ;
std : : string value = RandomString ( & rnd , kOperandSize ) ;
ASSERT_OK ( db_ - > Put ( WriteOptions ( ) , key , value ) ) ;
true_data [ key ] = value ;
}
// Do kTotalMerges merges
for ( int i = 0 ; i < kTotalMerges ; i + + ) {
std : : string key = Key ( rnd . Next ( ) % kKeyRange ) ;
std : : string value = RandomString ( & rnd , kOperandSize ) ;
ASSERT_OK ( db_ - > Merge ( WriteOptions ( ) , key , value ) ) ;
if ( true_data [ key ] < value ) {
true_data [ key ] = value ;
}
}
// Overwrite random kNumPutAfter keys
for ( int i = 0 ; i < kNumPutAfter ; i + + ) {
std : : string key = Key ( rnd . Next ( ) % kKeyRange ) ;
std : : string value = RandomString ( & rnd , kOperandSize ) ;
ASSERT_OK ( db_ - > Put ( WriteOptions ( ) , key , value ) ) ;
true_data [ key ] = value ;
}
// Delete random kNumDelete keys
for ( int i = 0 ; i < kNumDelete ; i + + ) {
std : : string key = Key ( rnd . Next ( ) % kKeyRange ) ;
ASSERT_OK ( db_ - > Delete ( WriteOptions ( ) , key ) ) ;
true_data . erase ( key ) ;
}
VerifyDBFromMap ( true_data ) ;
// Skip HashCuckoo since it does not support merge operators
} while ( ChangeOptions ( kSkipMergePut | kSkipHashCuckoo ) ) ;
}
class MergeOperatorHook : public MergeOperator {
public :
explicit MergeOperatorHook ( std : : shared_ptr < MergeOperator > _merge_op )
: merge_op_ ( _merge_op ) { }
virtual bool FullMergeV2 ( const MergeOperationInput & merge_in ,
MergeOperationOutput * merge_out ) const override {
before_merge_ ( ) ;
bool res = merge_op_ - > FullMergeV2 ( merge_in , merge_out ) ;
after_merge_ ( ) ;
return res ;
}
virtual const char * Name ( ) const override { return merge_op_ - > Name ( ) ; }
std : : shared_ptr < MergeOperator > merge_op_ ;
std : : function < void ( ) > before_merge_ = [ ] ( ) { } ;
std : : function < void ( ) > after_merge_ = [ ] ( ) { } ;
} ;
TEST_P ( MergeOperatorPinningTest , EvictCacheBeforeMerge ) {
Options options = CurrentOptions ( ) ;
auto merge_hook =
std : : make_shared < MergeOperatorHook > ( MergeOperators : : CreateMaxOperator ( ) ) ;
options . merge_operator = merge_hook ;
options . disable_auto_compactions = true ;
options . level0_slowdown_writes_trigger = ( 1 < < 30 ) ;
options . level0_stop_writes_trigger = ( 1 < < 30 ) ;
options . max_open_files = 20 ;
BlockBasedTableOptions bbto ;
bbto . no_block_cache = disable_block_cache_ ;
if ( bbto . no_block_cache = = false ) {
bbto . block_cache = NewLRUCache ( 64 * 1024 * 1024 ) ;
} else {
bbto . block_cache = nullptr ;
}
options . table_factory . reset ( NewBlockBasedTableFactory ( bbto ) ) ;
DestroyAndReopen ( options ) ;
const int kNumOperands = 30 ;
const int kNumKeys = 1000 ;
const int kOperandSize = 100 ;
Random rnd ( 301 ) ;
// 1000 keys every key have 30 operands, every operand is in a different file
std : : map < std : : string , std : : string > true_data ;
for ( int i = 0 ; i < kNumOperands ; i + + ) {
for ( int j = 0 ; j < kNumKeys ; j + + ) {
std : : string k = Key ( j ) ;
std : : string v = RandomString ( & rnd , kOperandSize ) ;
ASSERT_OK ( db_ - > Merge ( WriteOptions ( ) , k , v ) ) ;
true_data [ k ] = std : : max ( true_data [ k ] , v ) ;
}
ASSERT_OK ( Flush ( ) ) ;
}
std : : vector < uint64_t > file_numbers = ListTableFiles ( env_ , dbname_ ) ;
ASSERT_EQ ( file_numbers . size ( ) , kNumOperands ) ;
int merge_cnt = 0 ;
// Code executed before merge operation
merge_hook - > before_merge_ = [ & ] ( ) {
// Evict all tables from cache before every merge operation
for ( uint64_t num : file_numbers ) {
TableCache : : Evict ( dbfull ( ) - > TEST_table_cache ( ) , num ) ;
}
// Decrease cache capacity to force all unrefed blocks to be evicted
if ( bbto . block_cache ) {
bbto . block_cache - > SetCapacity ( 1 ) ;
}
merge_cnt + + ;
} ;
// Code executed after merge operation
merge_hook - > after_merge_ = [ & ] ( ) {
// Increase capacity again after doing the merge
if ( bbto . block_cache ) {
bbto . block_cache - > SetCapacity ( 64 * 1024 * 1024 ) ;
}
} ;
size_t total_reads ;
VerifyDBFromMap ( true_data , & total_reads ) ;
ASSERT_EQ ( merge_cnt , total_reads ) ;
db_ - > CompactRange ( CompactRangeOptions ( ) , nullptr , nullptr ) ;
VerifyDBFromMap ( true_data , & total_reads ) ;
}
TEST_P ( MergeOperatorPinningTest , TailingIterator ) {
Options options = CurrentOptions ( ) ;
options . merge_operator = MergeOperators : : CreateMaxOperator ( ) ;
BlockBasedTableOptions bbto ;
bbto . no_block_cache = disable_block_cache_ ;
options . table_factory . reset ( NewBlockBasedTableFactory ( bbto ) ) ;
DestroyAndReopen ( options ) ;
const int kNumOperands = 100 ;
const int kNumWrites = 100000 ;
std : : function < void ( ) > writer_func = [ & ] ( ) {
int k = 0 ;
for ( int i = 0 ; i < kNumWrites ; i + + ) {
db_ - > Merge ( WriteOptions ( ) , Key ( k ) , Key ( k ) ) ;
if ( i & & i % kNumOperands = = 0 ) {
k + + ;
}
if ( i & & i % 127 = = 0 ) {
ASSERT_OK ( Flush ( ) ) ;
}
if ( i & & i % 317 = = 0 ) {
ASSERT_OK ( db_ - > CompactRange ( CompactRangeOptions ( ) , nullptr , nullptr ) ) ;
}
}
} ;
std : : function < void ( ) > reader_func = [ & ] ( ) {
ReadOptions ro ;
ro . tailing = true ;
Iterator * iter = db_ - > NewIterator ( ro ) ;
iter - > SeekToFirst ( ) ;
for ( int i = 0 ; i < ( kNumWrites / kNumOperands ) ; i + + ) {
while ( ! iter - > Valid ( ) ) {
// wait for the key to be written
env_ - > SleepForMicroseconds ( 100 ) ;
iter - > Seek ( Key ( i ) ) ;
}
ASSERT_EQ ( iter - > key ( ) , Key ( i ) ) ;
ASSERT_EQ ( iter - > value ( ) , Key ( i ) ) ;
iter - > Next ( ) ;
}
delete iter ;
} ;
rocksdb : : port : : Thread writer_thread ( writer_func ) ;
rocksdb : : port : : Thread reader_thread ( reader_func ) ;
writer_thread . join ( ) ;
reader_thread . join ( ) ;
}
# endif // ROCKSDB_LITE
} // namespace rocksdb
int main ( int argc , char * * argv ) {