@ -283,7 +283,7 @@ struct BlockBasedTableBuilder::Rep {
const BlockBasedTableOptions table_options ;
const InternalKeyComparator & internal_comparator ;
WritableFileWriter * file ;
uint64_t offset = 0 ;
std : : atomic < uint64_t > offset ;
Status status ;
IOStatus io_status ;
// Synchronize status & io_status accesses across threads from main thread,
@ -365,6 +365,9 @@ struct BlockBasedTableBuilder::Rep {
std : : unique_ptr < ParallelCompressionRep > pc_rep ;
uint64_t get_offset ( ) { return offset . load ( std : : memory_order_relaxed ) ; }
void set_offset ( uint64_t o ) { offset . store ( o , std : : memory_order_relaxed ) ; }
Rep ( const ImmutableCFOptions & _ioptions , const MutableCFOptions & _moptions ,
const BlockBasedTableOptions & table_opt ,
const InternalKeyComparator & icomparator ,
@ -382,6 +385,7 @@ struct BlockBasedTableBuilder::Rep {
table_options ( table_opt ) ,
internal_comparator ( icomparator ) ,
file ( f ) ,
offset ( 0 ) ,
alignment ( table_options . block_align
? std : : min ( table_options . block_size , kDefaultPageSize )
: 0 ) ,
@ -567,9 +571,9 @@ struct BlockBasedTableBuilder::ParallelCompressionRep {
// Number of blocks under compression and not appended yet.
std : : atomic < uint64_t > blocks_inflight ;
// Current compression ratio, maintained by BGWorkWriteRawBlock.
double curr_compression_ratio ;
std : : atomic < double > curr_compression_ratio ;
// Estimated SST file size.
uint64_t estimated_file_size ;
std : : atomic < uint64_t > estimated_file_size ;
// Wait for the completion of first block compression to get a
// non-zero compression ratio.
@ -742,13 +746,15 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
r - > index_builder - > OnKeyAdded ( key ) ;
}
}
NotifyCollectTableCollectorsOnAdd ( key , value , r - > offset ,
// TODO offset passed in is not accurate for parallel compression case
NotifyCollectTableCollectorsOnAdd ( key , value , r - > get_offset ( ) ,
r - > table_properties_collectors ,
r - > ioptions . info_log ) ;
} else if ( value_type = = kTypeRangeDeletion ) {
r - > range_del_block . Add ( key , value ) ;
NotifyCollectTableCollectorsOnAdd ( key , value , r - > offset ,
// TODO offset passed in is not accurate for parallel compression case
NotifyCollectTableCollectorsOnAdd ( key , value , r - > get_offset ( ) ,
r - > table_properties_collectors ,
r - > ioptions . info_log ) ;
} else {
@ -802,11 +808,13 @@ void BlockBasedTableBuilder::Flush() {
block_rep - > data - > size ( ) ;
uint64_t new_blocks_inflight =
r - > pc_rep - > blocks_inflight . fetch_add ( 1 , std : : memory_order_relaxed ) + 1 ;
r - > pc_rep - > estimated_file_size =
r - > offset +
static_cast < uint64_t > ( static_cast < double > ( new_raw_bytes_inflight ) *
r - > pc_rep - > curr_compression_ratio ) +
new_blocks_inflight * kBlockTrailerSize ;
r - > pc_rep - > estimated_file_size . store (
r - > get_offset ( ) +
static_cast < uint64_t > ( static_cast < double > ( new_raw_bytes_inflight ) *
r - > pc_rep - > curr_compression_ratio . load (
std : : memory_order_relaxed ) ) +
new_blocks_inflight * kBlockTrailerSize ,
std : : memory_order_relaxed ) ;
assert ( block_rep - > status . ok ( ) ) ;
if ( ! r - > pc_rep - > write_queue . push ( block_rep - > slot . get ( ) ) ) {
@ -856,9 +864,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
r - > compressed_output . clear ( ) ;
if ( is_data_block ) {
if ( r - > filter_builder ! = nullptr ) {
r - > filter_builder - > StartBlock ( r - > offset ) ;
r - > filter_builder - > StartBlock ( r - > get_ offset( ) ) ;
}
r - > props . data_size = r - > offset ;
r - > props . data_size = r - > get_ offset( ) ;
+ + r - > props . num_data_blocks ;
}
}
@ -986,7 +994,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
Status s = Status : : OK ( ) ;
IOStatus io_s = IOStatus : : OK ( ) ;
StopWatch sw ( r - > ioptions . env , r - > ioptions . statistics , WRITE_RAW_BLOCK_MICROS ) ;
handle - > set_offset ( r - > offset ) ;
handle - > set_offset ( r - > get_ offset( ) ) ;
handle - > set_size ( block_contents . size ( ) ) ;
assert ( status ( ) . ok ( ) ) ;
assert ( io_status ( ) . ok ( ) ) ;
@ -1044,7 +1052,8 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
SetIOStatusAtom ( io_s ) ;
}
if ( s . ok ( ) & & io_s . ok ( ) ) {
r - > offset + = block_contents . size ( ) + kBlockTrailerSize ;
r - > set_offset ( r - > get_offset ( ) + block_contents . size ( ) +
kBlockTrailerSize ) ;
if ( r - > table_options . block_align & & is_data_block ) {
size_t pad_bytes =
( r - > alignment - ( ( block_contents . size ( ) + kBlockTrailerSize ) &
@ -1052,7 +1061,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
( r - > alignment - 1 ) ;
io_s = r - > file - > Pad ( pad_bytes ) ;
if ( io_s . ok ( ) ) {
r - > offset + = pad_bytes ;
r - > set_offset ( r - > get_offset ( ) + pad_bytes ) ;
} else {
SetIOStatusAtom ( io_s ) ;
}
@ -1062,12 +1071,14 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
assert ( r - > pc_rep - > raw_bytes_compressed +
r - > pc_rep - > raw_bytes_curr_block >
0 ) ;
r - > pc_rep - > curr_compression_ratio =
( r - > pc_rep - > curr_compression_ratio *
r - > pc_rep - > curr_compression_ratio . store (
( r - > pc_rep - > curr_compression_ratio . load (
std : : memory_order_relaxed ) *
r - > pc_rep - > raw_bytes_compressed +
block_contents . size ( ) ) /
static_cast < double > ( r - > pc_rep - > raw_bytes_compressed +
r - > pc_rep - > raw_bytes_curr_block ) ;
static_cast < double > ( r - > pc_rep - > raw_bytes_compressed +
r - > pc_rep - > raw_bytes_curr_block ) ,
std : : memory_order_relaxed ) ;
r - > pc_rep - > raw_bytes_compressed + = r - > pc_rep - > raw_bytes_curr_block ;
uint64_t new_raw_bytes_inflight =
r - > pc_rep - > raw_bytes_inflight . fetch_sub (
@ -1076,13 +1087,17 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
uint64_t new_blocks_inflight = r - > pc_rep - > blocks_inflight . fetch_sub (
1 , std : : memory_order_relaxed ) -
1 ;
r - > pc_rep - > estimated_file_size =
r - > offset +
static_cast < uint64_t > ( static_cast < double > ( new_raw_bytes_inflight ) *
r - > pc_rep - > curr_compression_ratio ) +
new_blocks_inflight * kBlockTrailerSize ;
r - > pc_rep - > estimated_file_size . store (
r - > get_offset ( ) +
static_cast < uint64_t > (
static_cast < double > ( new_raw_bytes_inflight ) *
r - > pc_rep - > curr_compression_ratio . load (
std : : memory_order_relaxed ) ) +
new_blocks_inflight * kBlockTrailerSize ,
std : : memory_order_relaxed ) ;
} else {
r - > pc_rep - > estimated_file_size = r - > offset ;
r - > pc_rep - > estimated_file_size . store ( r - > get_offset ( ) ,
std : : memory_order_relaxed ) ;
}
}
}
@ -1129,9 +1144,9 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
}
if ( r - > filter_builder ! = nullptr ) {
r - > filter_builder - > StartBlock ( r - > offset ) ;
r - > filter_builder - > StartBlock ( r - > get_ offset( ) ) ;
}
r - > props . data_size = r - > offset ;
r - > props . data_size = r - > get_ offset( ) ;
+ + r - > props . num_data_blocks ;
if ( block_rep - > first_key_in_next_block = = nullptr ) {
@ -1227,7 +1242,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
& DeleteCachedBlockContents ) ;
// Invalidate OS cache.
r - > file - > InvalidateCache ( static_cast < size_t > ( r - > offset ) , size ) ;
r - > file - > InvalidateCache ( static_cast < size_t > ( r - > get_ offset( ) ) , size ) ;
}
return Status : : OK ( ) ;
}
@ -1452,7 +1467,7 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
assert ( r - > io_status . ok ( ) ) ;
r - > io_status = r - > file - > Append ( footer_encoding ) ;
if ( r - > io_status . ok ( ) ) {
r - > offset + = footer_encoding . size ( ) ;
r - > set_offset ( r - > get_offset ( ) + footer_encoding . size ( ) ) ;
}
r - > status = r - > io_status ;
}
@ -1622,13 +1637,17 @@ uint64_t BlockBasedTableBuilder::NumEntries() const {
return rep_ - > props . num_entries ;
}
bool BlockBasedTableBuilder : : IsEmpty ( ) const {
return rep_ - > props . num_entries = = 0 & & rep_ - > props . num_range_deletions = = 0 ;
}
uint64_t BlockBasedTableBuilder : : FileSize ( ) const { return rep_ - > offset ; }
uint64_t BlockBasedTableBuilder : : EstimatedFileSize ( ) const {
if ( rep_ - > compression_opts . parallel_threads > 1 ) {
// Use compression ratio so far and inflight raw bytes to estimate
// final SST size.
return rep_ - > pc_rep - > estimated_file_size ;
return rep_ - > pc_rep - > estimated_file_size . load ( std : : memory_order_relaxed ) ;
} else {
return FileSize ( ) ;
}