@ -1331,7 +1331,42 @@ class CompactionCompressionListener : public EventListener {
const Options * db_options_ ;
} ;
TEST_F ( DBTest2 , CompressionFailures ) {
enum CompressionFailureType {
kTestCompressionFail ,
kTestDecompressionFail ,
kTestDecompressionCorruption
} ;
class CompressionFailuresTest
: public DBTest2 ,
public testing : : WithParamInterface < std : : tuple <
CompressionFailureType , CompressionType , uint32_t , uint32_t > > {
public :
CompressionFailuresTest ( ) {
std : : tie ( compression_failure_type_ , compression_type_ ,
compression_max_dict_bytes_ , compression_parallel_threads_ ) =
GetParam ( ) ;
}
CompressionFailureType compression_failure_type_ = kTestCompressionFail ;
CompressionType compression_type_ = kNoCompression ;
uint32_t compression_max_dict_bytes_ = 0 ;
uint32_t compression_parallel_threads_ = 0 ;
} ;
INSTANTIATE_TEST_CASE_P (
DBTest2 , CompressionFailuresTest ,
: : testing : : Combine ( : : testing : : Values ( kTestCompressionFail ,
kTestDecompressionFail ,
kTestDecompressionCorruption ) ,
: : testing : : ValuesIn ( GetSupportedCompressions ( ) ) ,
: : testing : : Values ( 0 , 10 ) , : : testing : : Values ( 1 , 4 ) ) ) ;
TEST_P ( CompressionFailuresTest , CompressionFailures ) {
if ( compression_type_ = = kNoCompression ) {
return ;
}
Options options = CurrentOptions ( ) ;
options . level0_file_num_compaction_trigger = 2 ;
options . max_bytes_for_level_base = 1024 ;
@ -1345,50 +1380,41 @@ TEST_F(DBTest2, CompressionFailures) {
table_options . verify_compression = true ;
options . table_factory . reset ( new BlockBasedTableFactory ( table_options ) ) ;
enum CompressionFailureType {
kTestCompressionFail ,
kTestDecompressionFail ,
kTestDecompressionCorruption
} curr_compression_failure_type ;
std : : vector < CompressionFailureType > compression_failure_types = {
kTestCompressionFail , kTestDecompressionFail ,
kTestDecompressionCorruption } ;
options . compression = compression_type_ ;
options . compression_opts . parallel_threads = compression_parallel_threads_ ;
options . compression_opts . max_dict_bytes = compression_max_dict_bytes_ ;
options . bottommost_compression_opts . parallel_threads =
compression_parallel_threads_ ;
options . bottommost_compression_opts . max_dict_bytes =
compression_max_dict_bytes_ ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue " ,
[ & curr_compression_failure_type ] ( void * arg ) {
bool * ret = static_cast < bool * > ( arg ) ;
if ( curr_compression_failure_type = = kTestCompressionFail ) {
if ( compression_failure_type_ = = kTestCompressionFail ) {
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" BlockBasedTableBuilder::CompressBlockInternal:TamperWithReturnValue " ,
[ ] ( void * arg ) {
bool * ret = static_cast < bool * > ( arg ) ;
* ret = false ;
}
} ) ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" UncompressBlockContentsForCompressionType:TamperWithReturnValue " ,
[ & curr_compression_failure_type ] ( void * arg ) {
Status * ret = static_cast < Status * > ( arg ) ;
ASSERT_OK ( * ret ) ;
if ( curr_compression_failure_type = = kTestDecompressionFail ) {
} ) ;
} else if ( compression_failure_type_ = = kTestDecompressionFail ) {
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" UncompressBlockContentsForCompressionType:TamperWithReturnValue " ,
[ ] ( void * arg ) {
Status * ret = static_cast < Status * > ( arg ) ;
ASSERT_OK ( * ret ) ;
* ret = Status : : Corruption ( " kTestDecompressionFail " ) ;
}
} ) ;
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" UncompressBlockContentsForCompressionType: "
" TamperWithDecompressionOutput " ,
[ & curr_compression_failure_type ] ( void * arg ) {
if ( curr_compression_failure_type = = kTestDecompressionCorruption ) {
} ) ;
} else if ( compression_failure_type_ = = kTestDecompressionCorruption ) {
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > SetCallBack (
" UncompressBlockContentsForCompressionType: "
" TamperWithDecompressionOutput " ,
[ ] ( void * arg ) {
BlockContents * contents = static_cast < BlockContents * > ( arg ) ;
// Ensure uncompressed data != original data
const size_t len = contents - > data . size ( ) + 1 ;
std : : unique_ptr < char [ ] > fake_data ( new char [ len ] ( ) ) ;
* contents = BlockContents ( std : : move ( fake_data ) , len ) ;
}
} ) ;
std : : vector < CompressionType > compression_types = GetSupportedCompressions ( ) ;
std : : vector < uint32_t > compression_max_dict_bytes = { 0 , 10 } ;
std : : vector < uint32_t > compression_parallel_threads = { 1 , 4 } ;
} ) ;
}
std : : map < std : : string , std : : string > key_value_written ;
@ -1399,74 +1425,55 @@ TEST_F(DBTest2, CompressionFailures) {
Status s = Status : : OK ( ) ;
for ( auto compression_failure_type : compression_failure_types ) {
curr_compression_failure_type = compression_failure_type ;
for ( auto compression_type : compression_types ) {
if ( compression_type = = kNoCompression ) {
continue ;
DestroyAndReopen ( options ) ;
// Write 10 random files
for ( int i = 0 ; i < 10 ; i + + ) {
for ( int j = 0 ; j < 5 ; j + + ) {
std : : string key = RandomString ( & rnd , kKeySize ) ;
// Ensure good compression ratio
std : : string valueUnit = RandomString ( & rnd , kValUnitSize ) ;
std : : string value ;
for ( int k = 0 ; k < kValSize ; k + = kValUnitSize ) {
value + = valueUnit ;
}
for ( auto parallel_threads : compression_parallel_threads ) {
for ( auto max_dict_bytes : compression_max_dict_bytes ) {
options . compression = compression_type ;
options . compression_opts . parallel_threads = parallel_threads ;
options . compression_opts . max_dict_bytes = max_dict_bytes ;
options . bottommost_compression_opts . parallel_threads =
parallel_threads ;
options . bottommost_compression_opts . max_dict_bytes = max_dict_bytes ;
DestroyAndReopen ( options ) ;
// Write 10 random files
for ( int i = 0 ; i < 10 ; i + + ) {
for ( int j = 0 ; j < 5 ; j + + ) {
std : : string key = RandomString ( & rnd , kKeySize ) ;
// Ensure good compression ratio
std : : string valueUnit = RandomString ( & rnd , kValUnitSize ) ;
std : : string value ;
for ( int k = 0 ; k < kValSize ; k + = kValUnitSize ) {
value + = valueUnit ;
}
s = Put ( key , value ) ;
if ( compression_failure_type = = kTestCompressionFail ) {
key_value_written [ key ] = value ;
ASSERT_OK ( s ) ;
}
}
s = Flush ( ) ;
if ( compression_failure_type = = kTestCompressionFail ) {
ASSERT_OK ( s ) ;
}
s = dbfull ( ) - > TEST_WaitForCompact ( ) ;
if ( compression_failure_type = = kTestCompressionFail ) {
ASSERT_OK ( s ) ;
}
if ( i = = 4 ) {
// Make compression fail at the mid of table building
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
}
}
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
if ( compression_failure_type = = kTestCompressionFail ) {
// Should be kNoCompression, check content consistency
std : : unique_ptr < Iterator > db_iter ( db_ - > NewIterator ( ReadOptions ( ) ) ) ;
for ( db_iter - > SeekToFirst ( ) ; db_iter - > Valid ( ) ; db_iter - > Next ( ) ) {
std : : string key = db_iter - > key ( ) . ToString ( ) ;
std : : string value = db_iter - > value ( ) . ToString ( ) ;
ASSERT_NE ( key_value_written . find ( key ) , key_value_written . end ( ) ) ;
ASSERT_EQ ( key_value_written [ key ] , value ) ;
key_value_written . erase ( key ) ;
}
ASSERT_EQ ( 0 , key_value_written . size ( ) ) ;
} else if ( compression_failure_type = = kTestDecompressionFail ) {
ASSERT_EQ ( std : : string ( s . getState ( ) ) ,
" Could not decompress: kTestDecompressionFail " ) ;
} else if ( compression_failure_type = = kTestDecompressionCorruption ) {
ASSERT_EQ ( std : : string ( s . getState ( ) ) ,
" Decompressed block did not match raw block " ) ;
}
}
s = Put ( key , value ) ;
if ( compression_failure_type_ = = kTestCompressionFail ) {
key_value_written [ key ] = value ;
ASSERT_OK ( s ) ;
}
}
s = Flush ( ) ;
if ( compression_failure_type_ = = kTestCompressionFail ) {
ASSERT_OK ( s ) ;
}
s = dbfull ( ) - > TEST_WaitForCompact ( ) ;
if ( compression_failure_type_ = = kTestCompressionFail ) {
ASSERT_OK ( s ) ;
}
if ( i = = 4 ) {
// Make compression fail at the mid of table building
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > EnableProcessing ( ) ;
}
}
ROCKSDB_NAMESPACE : : SyncPoint : : GetInstance ( ) - > DisableProcessing ( ) ;
if ( compression_failure_type_ = = kTestCompressionFail ) {
// Should be kNoCompression, check content consistency
std : : unique_ptr < Iterator > db_iter ( db_ - > NewIterator ( ReadOptions ( ) ) ) ;
for ( db_iter - > SeekToFirst ( ) ; db_iter - > Valid ( ) ; db_iter - > Next ( ) ) {
std : : string key = db_iter - > key ( ) . ToString ( ) ;
std : : string value = db_iter - > value ( ) . ToString ( ) ;
ASSERT_NE ( key_value_written . find ( key ) , key_value_written . end ( ) ) ;
ASSERT_EQ ( key_value_written [ key ] , value ) ;
key_value_written . erase ( key ) ;
}
ASSERT_EQ ( 0 , key_value_written . size ( ) ) ;
} else if ( compression_failure_type_ = = kTestDecompressionFail ) {
ASSERT_EQ ( std : : string ( s . getState ( ) ) ,
" Could not decompress: kTestDecompressionFail " ) ;
} else if ( compression_failure_type_ = = kTestDecompressionCorruption ) {
ASSERT_EQ ( std : : string ( s . getState ( ) ) ,
" Decompressed block did not match raw block " ) ;
}
}