@ -2694,7 +2694,10 @@ Status DBImpl::ProcessKeyValueCompaction(
Iterator * input ,
Iterator * input ,
CompactionState * compact ,
CompactionState * compact ,
bool is_compaction_v2 ,
bool is_compaction_v2 ,
int * num_output_records ,
LogBuffer * log_buffer ) {
LogBuffer * log_buffer ) {
assert ( num_output_records ! = nullptr ) ;
size_t combined_idx = 0 ;
size_t combined_idx = 0 ;
Status status ;
Status status ;
std : : string compaction_filter_value ;
std : : string compaction_filter_value ;
@ -2965,6 +2968,7 @@ Status DBImpl::ProcessKeyValueCompaction(
}
}
compact - > current_output ( ) - > largest . DecodeFrom ( newkey ) ;
compact - > current_output ( ) - > largest . DecodeFrom ( newkey ) ;
compact - > builder - > Add ( newkey , value ) ;
compact - > builder - > Add ( newkey , value ) ;
( * num_output_records ) + + ,
compact - > current_output ( ) - > largest_seqno =
compact - > current_output ( ) - > largest_seqno =
std : : max ( compact - > current_output ( ) - > largest_seqno , seqno ) ;
std : : max ( compact - > current_output ( ) - > largest_seqno , seqno ) ;
@ -3140,6 +3144,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
mutex_ . Unlock ( ) ;
mutex_ . Unlock ( ) ;
log_buffer - > FlushBufferToLog ( ) ;
log_buffer - > FlushBufferToLog ( ) ;
int num_output_records = 0 ;
const uint64_t start_micros = env_ - > NowMicros ( ) ;
const uint64_t start_micros = env_ - > NowMicros ( ) ;
unique_ptr < Iterator > input ( versions_ - > MakeInputIterator ( compact - > compaction ) ) ;
unique_ptr < Iterator > input ( versions_ - > MakeInputIterator ( compact - > compaction ) ) ;
input - > SeekToFirst ( ) ;
input - > SeekToFirst ( ) ;
@ -3168,6 +3173,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
input . get ( ) ,
input . get ( ) ,
compact ,
compact ,
false ,
false ,
& num_output_records ,
log_buffer ) ;
log_buffer ) ;
} else {
} else {
// temp_backup_input always point to the start of the current buffer
// temp_backup_input always point to the start of the current buffer
@ -3249,6 +3255,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
input . get ( ) ,
input . get ( ) ,
compact ,
compact ,
true ,
true ,
& num_output_records ,
log_buffer ) ;
log_buffer ) ;
if ( ! status . ok ( ) ) {
if ( ! status . ok ( ) ) {
@ -3286,6 +3293,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
input . get ( ) ,
input . get ( ) ,
compact ,
compact ,
true ,
true ,
& num_output_records ,
log_buffer ) ;
log_buffer ) ;
compact - > CleanupBatchBuffer ( ) ;
compact - > CleanupBatchBuffer ( ) ;
@ -3309,6 +3317,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
input . get ( ) ,
input . get ( ) ,
compact ,
compact ,
true ,
true ,
& num_output_records ,
log_buffer ) ;
log_buffer ) ;
} // checking for compaction filter v2
} // checking for compaction filter v2
@ -3342,17 +3351,24 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
}
stats . files_out_levelnp1 = num_output_files ;
stats . files_out_levelnp1 = num_output_files ;
uint64_t num_input_records = 0 ;
for ( int i = 0 ; i < compact - > compaction - > num_input_files ( 0 ) ; i + + ) {
for ( int i = 0 ; i < compact - > compaction - > num_input_files ( 0 ) ; i + + ) {
stats . bytes_readn + = compact - > compaction - > input ( 0 , i ) - > fd . GetFileSize ( ) ;
stats . bytes_readn + = compact - > compaction - > input ( 0 , i ) - > fd . GetFileSize ( ) ;
stats . num_input_records + = compact - > compaction - > input ( 0 , i ) - > num_entries ;
num_input_records + = compact - > compaction - > input ( 0 , i ) - > num_entries ;
}
}
for ( int i = 0 ; i < compact - > compaction - > num_input_files ( 1 ) ; i + + ) {
for ( int i = 0 ; i < compact - > compaction - > num_input_files ( 1 ) ; i + + ) {
stats . bytes_readnp1 + = compact - > compaction - > input ( 1 , i ) - > fd . GetFileSize ( ) ;
stats . bytes_readnp1 + = compact - > compaction - > input ( 1 , i ) - > fd . GetFileSize ( ) ;
num_input_records + = compact - > compaction - > input ( 1 , i ) - > num_entries ;
}
}
for ( int i = 0 ; i < num_output_files ; i + + ) {
for ( int i = 0 ; i < num_output_files ; i + + ) {
stats . bytes_written + = compact - > outputs [ i ] . file_size ;
stats . bytes_written + = compact - > outputs [ i ] . file_size ;
}
}
stats . num_dropped_records =
static_cast < int > ( num_input_records ) - num_output_records ;
RecordCompactionIOStats ( ) ;
RecordCompactionIOStats ( ) ;
@ -3375,7 +3391,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
" [%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
" [%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
" files in(%d, %d) out(%d) "
" files in(%d, %d) out(%d) "
" MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
" MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
" write-amplify(%.1f) %s \n " ,
" write-amplify(%.1f) %s, records in: %d, records dropped: %d \n " ,
cfd - > GetName ( ) . c_str ( ) , cfd - > current ( ) - > LevelSummary ( & tmp ) ,
cfd - > GetName ( ) . c_str ( ) , cfd - > current ( ) - > LevelSummary ( & tmp ) ,
( stats . bytes_readn + stats . bytes_readnp1 ) /
( stats . bytes_readn + stats . bytes_readnp1 ) /
static_cast < double > ( stats . micros ) ,
static_cast < double > ( stats . micros ) ,
@ -3387,7 +3403,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
( stats . bytes_written + stats . bytes_readnp1 + stats . bytes_readn ) /
( stats . bytes_written + stats . bytes_readnp1 + stats . bytes_readn ) /
( double ) stats . bytes_readn ,
( double ) stats . bytes_readn ,
stats . bytes_written / ( double ) stats . bytes_readn ,
stats . bytes_written / ( double ) stats . bytes_readn ,
status . ToString ( ) . c_str ( ) ) ;
status . ToString ( ) . c_str ( ) , stats . num_input_records ,
stats . num_dropped_records ) ;
return status ;
return status ;
}
}