@ -255,112 +255,53 @@ Status ReadBlock(RandomAccessFile* file, const Footer& footer,
return s ;
}
// Decompress a block according to params
// May need to malloc a space for cache usage
Status DecompressBlock ( BlockContents * result , size_t block_size ,
bool do_uncompress , const char * buf ,
const Slice & contents , bool use_stack_buf ) {
Status s ;
size_t n = block_size ;
const char * data = contents . data ( ) ;
result - > data = Slice ( ) ;
result - > cachable = false ;
result - > heap_allocated = false ;
PERF_TIMER_GUARD ( block_decompress_time ) ;
rocksdb : : CompressionType compression_type =
static_cast < rocksdb : : CompressionType > ( data [ n ] ) ;
// If the caller has requested that the block not be uncompressed
if ( ! do_uncompress | | compression_type = = kNoCompression ) {
if ( data ! = buf ) {
// File implementation gave us pointer to some other data.
// Use it directly under the assumption that it will be live
// while the file is open.
result - > data = Slice ( data , n ) ;
result - > heap_allocated = false ;
result - > cachable = false ; // Do not double-cache
} else {
if ( use_stack_buf ) {
// Need to allocate space in heap for cache usage
char * new_buf = new char [ n ] ;
memcpy ( new_buf , buf , n ) ;
result - > data = Slice ( new_buf , n ) ;
} else {
result - > data = Slice ( buf , n ) ;
}
result - > heap_allocated = true ;
result - > cachable = true ;
}
result - > compression_type = compression_type ;
s = Status : : OK ( ) ;
Status ReadBlockContents ( RandomAccessFile * file , const Footer & footer ,
const ReadOptions & options , const BlockHandle & handle ,
BlockContents * contents , Env * env ,
bool decompression_requested ) {
Status status ;
Slice slice ;
size_t n = static_cast < size_t > ( handle . size ( ) ) ;
std : : unique_ptr < char [ ] > heap_buf ;
char stack_buf [ DefaultStackBufferSize ] ;
char * used_buf = nullptr ;
rocksdb : : CompressionType compression_type ;
if ( decompression_requested & & n + kBlockTrailerSize < DefaultStackBufferSize ) {
//If we've got a small enough hunk of data, read it in to the
//trivially allocated stack buffer instead of needing a full malloc()
used_buf = & stack_buf [ 0 ] ;
} else {
s = UncompressBlockContents ( data , n , result ) ;
heap_buf = std : : unique_ptr < char [ ] > ( new char [ n + kBlockTrailerSize ] ) ;
used_buf = heap_buf . get ( ) ;
}
return s ;
}
// Read and Decompress block
// Use buf in stack as temp reading buffer
Status ReadAndDecompressFast ( RandomAccessFile * file , const Footer & footer ,
const ReadOptions & options ,
const BlockHandle & handle , BlockContents * result ,
Env * env , bool do_uncompress ) {
Status s ;
Slice contents ;
size_t n = static_cast < size_t > ( handle . size ( ) ) ;
char buf [ DefaultStackBufferSize ] ;
status = ReadBlock ( file , footer , options , handle , & slice , used_buf ) ;
s = ReadBlock ( file , footer , options , handle , & contents , buf ) ;
if ( ! s . ok ( ) ) {
return s ;
}
s = DecompressBlock ( result , n , do_uncompress , buf , contents , true ) ;
if ( ! s . ok ( ) ) {
return s ;
if ( ! status . ok ( ) ) {
return status ;
}
return s ;
}
// Read and Decompress block
// Use buf in heap as temp reading buffer
Status ReadAndDecompress ( RandomAccessFile * file , const Footer & footer ,
const ReadOptions & options , const BlockHandle & handle ,
BlockContents * result , Env * env , bool do_uncompress ) {
Status s ;
Slice contents ;
size_t n = static_cast < size_t > ( handle . size ( ) ) ;
char * buf = new char [ n + kBlockTrailerSize ] ;
PERF_TIMER_GUARD ( block_decompress_time ) ;
s = ReadBlock ( file , footer , options , handle , & contents , buf ) ;
if ( ! s . ok ( ) ) {
delete [ ] buf ;
return s ;
}
s = DecompressBlock ( result , n , do_uncompress , buf , contents , false ) ;
if ( ! s . ok ( ) ) {
delete [ ] buf ;
return s ;
compression_type = static_cast < rocksdb : : CompressionType > ( slice . data ( ) [ n ] ) ;
if ( decompression_requested & & compression_type ! = kNoCompression ) {
return UncompressBlockContents ( slice . data ( ) , n , contents ) ;
}
if ( result - > data . data ( ) ! = buf ) {
delete [ ] buf ;
if ( slice . data ( ) ! = used_buf ) {
* contents = BlockContents ( Slice ( slice . data ( ) , n ) , false , compression_type ) ;
return status ;
}
return s ;
}
Status ReadBlockContents ( RandomAccessFile * file , const Footer & footer ,
const ReadOptions & options , const BlockHandle & handle ,
BlockContents * result , Env * env , bool do_uncompress ) {
size_t n = static_cast < size_t > ( handle . size ( ) ) ;
if ( do_uncompress & & n + kBlockTrailerSize < DefaultStackBufferSize ) {
return ReadAndDecompressFast ( file , footer , options , handle , result , env ,
do_uncompress ) ;
} else {
return ReadAndDecompress ( file , footer , options , handle , result , env ,
do_uncompress ) ;
if ( used_buf = = & stack_buf [ 0 ] ) {
heap_buf = std : : unique_ptr < char [ ] > ( new char [ n ] ) ;
memcpy ( heap_buf . get ( ) , stack_buf , n ) ;
}
* contents = BlockContents ( std : : move ( heap_buf ) , n , true , compression_type ) ;
return status ;
}
//
@ -370,8 +311,8 @@ Status ReadBlockContents(RandomAccessFile* file, const Footer& footer,
// buffer is returned via 'result' and it is upto the caller to
// free this buffer.
Status UncompressBlockContents ( const char * data , size_t n ,
BlockContents * result ) {
char * ubuf = nullptr ;
BlockContents * contents ) {
std : : unique_ptr < char [ ] > ubuf ;
int decompress_size = 0 ;
assert ( data [ n ] ! = kNoCompression ) ;
switch ( data [ n ] ) {
@ -382,64 +323,52 @@ Status UncompressBlockContents(const char* data, size_t n,
if ( ! port : : Snappy_GetUncompressedLength ( data , n , & ulength ) ) {
return Status : : Corruption ( snappy_corrupt_msg ) ;
}
ubuf = new char [ ulength ] ;
if ( ! port : : Snappy_Uncompress ( data , n , ubuf ) ) {
delete [ ] ubuf ;
ubuf = std : : unique_ptr < char [ ] > ( new char [ ulength ] ) ;
if ( ! port : : Snappy_Uncompress ( data , n , ubuf . get ( ) ) ) {
return Status : : Corruption ( snappy_corrupt_msg ) ;
}
result - > data = Slice ( ubuf , ulength ) ;
result - > heap_allocated = true ;
result - > cachable = true ;
* contents = BlockContents ( std : : move ( ubuf ) , ulength , true , kNoCompression ) ;
break ;
}
case kZlibCompression :
ubuf = port : : Zlib_Uncompress ( data , n , & decompress_size ) ;
ubuf = std : : unique_ptr < char [ ] > ( port : : Zlib_Uncompress ( data , n , & decompress_size ) ) ;
static char zlib_corrupt_msg [ ] =
" Zlib not supported or corrupted Zlib compressed block contents " ;
if ( ! ubuf ) {
return Status : : Corruption ( zlib_corrupt_msg ) ;
}
result - > data = Slice ( ubuf , decompress_size ) ;
result - > heap_allocated = true ;
result - > cachable = true ;
* contents = BlockContents ( std : : move ( ubuf ) , decompress_size , true , kNoCompression ) ;
break ;
case kBZip2Compression :
ubuf = port : : BZip2_Uncompress ( data , n , & decompress_size ) ;
ubuf = std : : unique_ptr < char [ ] > ( port : : BZip2_Uncompress ( data , n , & decompress_size ) ) ;
static char bzip2_corrupt_msg [ ] =
" Bzip2 not supported or corrupted Bzip2 compressed block contents " ;
if ( ! ubuf ) {
return Status : : Corruption ( bzip2_corrupt_msg ) ;
}
result - > data = Slice ( ubuf , decompress_size ) ;
result - > heap_allocated = true ;
result - > cachable = true ;
* contents = BlockContents ( std : : move ( ubuf ) , decompress_size , true , kNoCompression ) ;
break ;
case kLZ4Compression :
ubuf = port : : LZ4_Uncompress ( data , n , & decompress_size ) ;
ubuf = std : : unique_ptr < char [ ] > ( port : : LZ4_Uncompress ( data , n , & decompress_size ) ) ;
static char lz4_corrupt_msg [ ] =
" LZ4 not supported or corrupted LZ4 compressed block contents " ;
if ( ! ubuf ) {
return Status : : Corruption ( lz4_corrupt_msg ) ;
}
result - > data = Slice ( ubuf , decompress_size ) ;
result - > heap_allocated = true ;
result - > cachable = true ;
* contents = BlockContents ( std : : move ( ubuf ) , decompress_size , true , kNoCompression ) ;
break ;
case kLZ4HCCompression :
ubuf = port : : LZ4_Uncompress ( data , n , & decompress_size ) ;
ubuf = std : : unique_ptr < char [ ] > ( port : : LZ4_Uncompress ( data , n , & decompress_size ) ) ;
static char lz4hc_corrupt_msg [ ] =
" LZ4HC not supported or corrupted LZ4HC compressed block contents " ;
if ( ! ubuf ) {
return Status : : Corruption ( lz4hc_corrupt_msg ) ;
}
result - > data = Slice ( ubuf , decompress_size ) ;
result - > heap_allocated = true ;
result - > cachable = true ;
* contents = BlockContents ( std : : move ( ubuf ) , decompress_size , true , kNoCompression ) ;
break ;
default :
return Status : : Corruption ( " bad block type " ) ;
}
result - > compression_type = kNoCompression ; // not compressed any more
return Status : : OK ( ) ;
}