@ -285,12 +285,9 @@ struct BlockBasedTableBuilder::Rep {
const InternalKeyComparator & internal_comparator ;
WritableFileWriter * file ;
std : : atomic < uint64_t > offset ;
Status status ;
IOStatus io_status ;
// Synchronize status & io_status accesses across threads from main thread,
// compression thread and write thread in parallel compression.
std : : mutex status_mutex ;
std : : mutex io_status_mutex ;
size_t alignment ;
BlockBuilder data_block ;
// Buffers uncompressed data blocks and keys to replay later. Needed when
@ -369,6 +366,61 @@ struct BlockBasedTableBuilder::Rep {
uint64_t get_offset ( ) { return offset . load ( std : : memory_order_relaxed ) ; }
void set_offset ( uint64_t o ) { offset . store ( o , std : : memory_order_relaxed ) ; }
const IOStatus & GetIOStatus ( ) {
if ( compression_opts . parallel_threads > 1 ) {
std : : lock_guard < std : : mutex > lock ( status_mutex ) ;
return io_status ;
} else {
return io_status ;
}
}
const Status & GetStatus ( ) {
if ( compression_opts . parallel_threads > 1 ) {
std : : lock_guard < std : : mutex > lock ( status_mutex ) ;
return status ;
} else {
return status ;
}
}
void SyncStatusFromIOStatus ( ) {
if ( compression_opts . parallel_threads > 1 ) {
std : : lock_guard < std : : mutex > lock ( status_mutex ) ;
if ( status . ok ( ) ) {
status = io_status ;
}
} else if ( status . ok ( ) ) {
status = io_status ;
}
}
// Never erase an existing status that is not OK.
void SetStatus ( Status s ) {
if ( ! s . ok ( ) ) {
// Locking is an overkill for non compression_opts.parallel_threads
// case but since it's unlikely that s is not OK, we take this cost
// to be simplicity.
std : : lock_guard < std : : mutex > lock ( status_mutex ) ;
if ( status . ok ( ) ) {
status = s ;
}
}
}
// Never erase an existing I/O status that is not OK.
void SetIOStatus ( IOStatus ios ) {
if ( ! ios . ok ( ) ) {
// Locking is an overkill for non compression_opts.parallel_threads
// case but since it's unlikely that s is not OK, we take this cost
// to be simplicity.
std : : lock_guard < std : : mutex > lock ( status_mutex ) ;
if ( io_status . ok ( ) ) {
io_status = ios ;
}
}
}
Rep ( const ImmutableCFOptions & _ioptions , const MutableCFOptions & _moptions ,
const BlockBasedTableOptions & table_opt ,
const InternalKeyComparator & icomparator ,
@ -470,6 +522,10 @@ struct BlockBasedTableBuilder::Rep {
Rep & operator = ( const Rep & ) = delete ;
~ Rep ( ) { }
private :
Status status ;
IOStatus io_status ;
} ;
struct BlockBasedTableBuilder : : ParallelCompressionRep {
@ -860,9 +916,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
r - > data_begin_offset + = r - > data_block_and_keys_buffers . back ( ) . first . size ( ) ;
return ;
}
Status compress_status ;
CompressAndVerifyBlock ( raw_block_contents , is_data_block ,
* ( r - > compression_ctxs [ 0 ] ) , r - > verify_ctxs [ 0 ] . get ( ) ,
r - > compressed_output , block_contents , type , r - > status ) ;
& ( r - > compressed_output ) , & ( block_contents ) , & type ,
& compress_status ) ;
r - > SetStatus ( compress_status ) ;
if ( ! ok ( ) ) {
return ;
}
@ -883,8 +942,9 @@ void BlockBasedTableBuilder::BGWorkCompression(
while ( rep_ - > pc_rep - > compress_queue . pop ( block_rep ) ) {
CompressAndVerifyBlock ( block_rep - > contents , true , /* is_data_block*/
compression_ctx , verify_ctx ,
* ( block_rep - > compressed_data ) , block_rep - > contents ,
block_rep - > compression_type , block_rep - > status ) ;
block_rep - > compressed_data . get ( ) ,
& block_rep - > contents , & ( block_rep - > compression_type ) ,
& block_rep - > status ) ;
block_rep - > slot - > Fill ( block_rep ) ;
}
}
@ -892,8 +952,8 @@ void BlockBasedTableBuilder::BGWorkCompression(
void BlockBasedTableBuilder : : CompressAndVerifyBlock (
const Slice & raw_block_contents , bool is_data_block ,
CompressionContext & compression_ctx , UncompressionContext * verify_ctx_ptr ,
std : : string & compressed_output , Slice & block_contents ,
CompressionType & type , Status & out_status ) {
std : : string * compressed_output , Slice * block_contents ,
CompressionType * type , Status * out_status ) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
@ -901,7 +961,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
assert ( ok ( ) ) ;
Rep * r = rep_ ;
type = r - > compression_type ;
* type = r - > compression_type ;
uint64_t sample_for_compression = r - > sample_for_compression ;
bool abort_compression = false ;
@ -918,15 +978,15 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
}
assert ( compression_dict ! = nullptr ) ;
CompressionInfo compression_info ( r - > compression_opts , compression_ctx ,
* compression_dict , type ,
* compression_dict , * type ,
sample_for_compression ) ;
std : : string sampled_output_fast ;
std : : string sampled_output_slow ;
block_contents = CompressBlock (
raw_block_contents , compression_info , & type ,
* block_contents = CompressBlock (
raw_block_contents , compression_info , type ,
r - > table_options . format_version , is_data_block /* do_sample */ ,
& compressed_output , & sampled_output_fast , & sampled_output_slow ) ;
compressed_output , & sampled_output_fast , & sampled_output_slow ) ;
// notify collectors on block add
NotifyCollectTableCollectorsOnBlockAdd (
@ -936,7 +996,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
// Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the
// compressed data and compare to the input.
if ( type ! = kNoCompression & & r - > table_options . verify_compression ) {
if ( * type ! = kNoCompression & & r - > table_options . verify_compression ) {
// Retrieve the uncompressed contents into a new buffer
const UncompressionDict * verify_dict ;
if ( ! is_data_block | | r - > verify_dict = = nullptr ) {
@ -949,7 +1009,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
UncompressionInfo uncompression_info ( * verify_ctx_ptr , * verify_dict ,
r - > compression_type ) ;
Status stat = UncompressBlockContentsForCompressionType (
uncompression_info , block_contents . data ( ) , block_contents . size ( ) ,
uncompression_info , block_contents - > data ( ) , block_contents - > size ( ) ,
& contents , r - > table_options . format_version , r - > ioptions ) ;
if ( stat . ok ( ) ) {
@ -959,12 +1019,12 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
abort_compression = true ;
ROCKS_LOG_ERROR ( r - > ioptions . info_log ,
" Decompressed block did not match raw block " ) ;
out_status =
* out_status =
Status : : Corruption ( " Decompressed block did not match raw block " ) ;
}
} else {
// Decompression reported an error. abort.
out_status = Status : : Corruption ( " Could not decompress " ) ;
* out_status = Status : : Corruption ( " Could not decompress " ) ;
abort_compression = true ;
}
}
@ -977,9 +1037,9 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
// verification.
if ( abort_compression ) {
RecordTick ( r - > ioptions . statistics , NUMBER_BLOCK_NOT_COMPRESSED ) ;
type = kNoCompression ;
block_contents = raw_block_contents ;
} else if ( type ! = kNoCompression ) {
* type = kNoCompression ;
* block_contents = raw_block_contents ;
} else if ( * type ! = kNoCompression ) {
if ( ShouldReportDetailedTime ( r - > ioptions . env , r - > ioptions . statistics ) ) {
RecordTimeToHistogram ( r - > ioptions . statistics , COMPRESSION_TIMES_NANOS ,
timer . ElapsedNanos ( ) ) ;
@ -987,7 +1047,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
RecordInHistogram ( r - > ioptions . statistics , BYTES_COMPRESSED ,
raw_block_contents . size ( ) ) ;
RecordTick ( r - > ioptions . statistics , NUMBER_BLOCK_COMPRESSED ) ;
} else if ( type ! = r - > compression_type ) {
} else if ( * type ! = r - > compression_type ) {
RecordTick ( r - > ioptions . statistics , NUMBER_BLOCK_NOT_COMPRESSED ) ;
}
}
@ -1052,10 +1112,10 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
if ( io_s . ok ( ) ) {
s = InsertBlockInCache ( block_contents , type , handle ) ;
if ( ! s . ok ( ) ) {
SetStatusAtom ( s ) ;
r - > SetStatus ( s ) ;
}
} else {
SetIOStatusAtom ( io_s ) ;
r - > SetIOStatus ( io_s ) ;
}
if ( s . ok ( ) & & io_s . ok ( ) ) {
r - > set_offset ( r - > get_offset ( ) + block_contents . size ( ) +
@ -1069,7 +1129,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
if ( io_s . ok ( ) ) {
r - > set_offset ( r - > get_offset ( ) + pad_bytes ) ;
} else {
SetIOStatusAtom ( io_s ) ;
r - > SetIOStatus ( io_s ) ;
}
}
if ( r - > compression_opts . parallel_threads > 1 ) {
@ -1108,10 +1168,10 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
}
}
} else {
SetIOStatusAtom ( io_s ) ;
r - > SetIOStatus ( io_s ) ;
}
if ( ! io_s . ok ( ) & & s . ok ( ) ) {
SetStatusAtom ( io_s ) ;
r - > SetStatus ( io_s ) ;
}
}
@ -1122,7 +1182,7 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
while ( r - > pc_rep - > write_queue . pop ( slot ) ) {
slot - > Take ( block_rep ) ;
if ( ! block_rep - > status . ok ( ) ) {
SetStatusAtom ( block_rep - > status ) ;
r - > SetStatus ( block_rep - > status ) ;
break ;
}
@ -1139,7 +1199,7 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
r - > pc_rep - > raw_bytes_curr_block = block_rep - > data - > size ( ) ;
WriteRawBlock ( block_rep - > contents , block_rep - > compression_type ,
& r - > pending_handle , true /* is_data_block*/ ) ;
if ( ! r - > status . ok ( ) ) {
if ( ! ok ( ) ) {
break ;
}
@ -1170,40 +1230,10 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
}
}
Status BlockBasedTableBuilder : : status ( ) const {
if ( rep_ - > compression_opts . parallel_threads > 1 ) {
std : : lock_guard < std : : mutex > lock ( rep_ - > status_mutex ) ;
return rep_ - > status ;
} else {
return rep_ - > status ;
}
}
Status BlockBasedTableBuilder : : status ( ) const { return rep_ - > GetStatus ( ) ; }
IOStatus BlockBasedTableBuilder : : io_status ( ) const {
if ( rep_ - > compression_opts . parallel_threads > 1 ) {
std : : lock_guard < std : : mutex > lock ( rep_ - > io_status_mutex ) ;
return rep_ - > io_status ;
} else {
return rep_ - > io_status ;
}
}
void BlockBasedTableBuilder : : SetStatusAtom ( Status s ) {
if ( rep_ - > compression_opts . parallel_threads > 1 ) {
std : : lock_guard < std : : mutex > lock ( rep_ - > status_mutex ) ;
rep_ - > status = s ;
} else {
rep_ - > status = s ;
}
}
void BlockBasedTableBuilder : : SetIOStatusAtom ( IOStatus io_s ) {
if ( rep_ - > compression_opts . parallel_threads > 1 ) {
std : : lock_guard < std : : mutex > lock ( rep_ - > io_status_mutex ) ;
rep_ - > io_status = io_s ;
} else {
rep_ - > io_status = io_s ;
}
return rep_ - > GetIOStatus ( ) ;
}
static void DeleteCachedBlockContents ( const Slice & /*key*/ , void * value ) {
@ -1294,7 +1324,7 @@ void BlockBasedTableBuilder::WriteIndexBlock(
// HashIndexBuilder which is not multi-partition.
assert ( index_blocks . meta_blocks . empty ( ) ) ;
} else if ( ok ( ) & & ! index_builder_status . ok ( ) ) {
rep_ - > status = index_builder_status ;
rep_ - > SetStatus ( index_builder_status ) ;
}
if ( ok ( ) ) {
for ( const auto & item : index_blocks . meta_blocks ) {
@ -1319,7 +1349,7 @@ void BlockBasedTableBuilder::WriteIndexBlock(
while ( ok ( ) & & s . IsIncomplete ( ) ) {
s = rep_ - > index_builder - > Finish ( & index_blocks , * index_block_handle ) ;
if ( ! s . ok ( ) & & ! s . IsIncomplete ( ) ) {
rep_ - > status = s ;
rep_ - > SetStatus ( s ) ;
return ;
}
if ( rep_ - > table_options . enable_index_compression ) {
@ -1469,13 +1499,13 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
footer . set_checksum ( r - > table_options . checksum ) ;
std : : string footer_encoding ;
footer . EncodeTo ( & footer_encoding ) ;
assert ( r - > status . ok ( ) ) ;
assert ( r - > io_status . ok ( ) ) ;
r - > io_status = r - > file - > Append ( footer_encoding ) ;
if ( r - > io_statu s . ok ( ) ) {
assert ( ok ( ) ) ;
IOStatus ios = r - > file - > Append ( footer_encoding ) ;
r - > SetIOStatus ( ios ) ;
if ( ios . ok ( ) ) {
r - > set_offset ( r - > get_offset ( ) + footer_encoding . size ( ) ) ;
}
r - > status = r - > io_status ;
r - > SyncStatusFromIOStatus ( ) ;
}
void BlockBasedTableBuilder : : EnterUnbuffered ( ) {
@ -1622,7 +1652,7 @@ Status BlockBasedTableBuilder::Finish() {
WriteFooter ( metaindex_block_handle , index_block_handle ) ;
}
r - > state = Rep : : State : : kClosed ;
return r - > status ;
return r - > GetStatus ( ) ;
}
void BlockBasedTableBuilder : : Abandon ( ) {