@ -81,7 +81,11 @@ struct CompactionJob::CompactionState {
Output * current_output ( ) { return & outputs [ outputs . size ( ) - 1 ] ; }
explicit CompactionState ( Compaction * c ) : compaction ( c ) , total_bytes ( 0 ) { }
explicit CompactionState ( Compaction * c )
: compaction ( c ) ,
total_bytes ( 0 ) ,
num_input_records ( 0 ) ,
num_output_records ( 0 ) { }
// Create a client visible context of this compaction
CompactionFilter : : Context GetFilterContextV1 ( ) {
@ -117,6 +121,9 @@ struct CompactionJob::CompactionState {
std : : string cur_prefix_ ;
uint64_t num_input_records ;
uint64_t num_output_records ;
// Buffers the kv-pair that will be run through compaction filter V2
// in the future.
void BufferKeyValueSlices ( const Slice & key , const Slice & value ) {
@ -271,7 +278,6 @@ Status CompactionJob::Run() {
log_buffer_ - > FlushBufferToLog ( ) ;
ColumnFamilyData * cfd = compact_ - > compaction - > column_family_data ( ) ;
int num_output_records = 0 ;
const uint64_t start_micros = env_ - > NowMicros ( ) ;
std : : unique_ptr < Iterator > input (
versions_ - > MakeInputIterator ( compact_ - > compaction ) ) ;
@ -289,8 +295,7 @@ Status CompactionJob::Run() {
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
if ( ! compaction_filter_v2 ) {
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , false ,
& num_output_records ) ;
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , false ) ;
} else {
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
@ -361,8 +366,7 @@ Status CompactionJob::Run() {
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , true ,
& num_output_records ) ;
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , true ) ;
if ( ! status . ok ( ) ) {
break ;
@ -387,8 +391,7 @@ Status CompactionJob::Run() {
}
compact_ - > MergeKeyValueSliceBuffer ( & cfd - > internal_comparator ( ) ) ;
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , true ,
& num_output_records ) ;
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , true ) ;
compact_ - > CleanupBatchBuffer ( ) ;
compact_ - > CleanupMergedBuffer ( ) ;
@ -399,8 +402,7 @@ Status CompactionJob::Run() {
CallCompactionFilterV2 ( compaction_filter_v2 ) ;
}
compact_ - > MergeKeyValueSliceBuffer ( & cfd - > internal_comparator ( ) ) ;
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , true ,
& num_output_records ) ;
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , true ) ;
} // checking for compaction filter v2
if ( status . ok ( ) & &
@ -434,27 +436,26 @@ Status CompactionJob::Run() {
}
compaction_stats_ . files_out_levelnp1 = num_output_files ;
uint64_t num_input_records = 0 ;
for ( int i = 0 ; i < compact_ - > compaction - > num_input_files ( 0 ) ; i + + ) {
compaction_stats_ . bytes_readn + =
compact_ - > compaction - > input ( 0 , i ) - > fd . GetFileSize ( ) ;
compaction_stats_ . num_input_records + =
compact_ - > compaction - > input ( 0 , i ) - > num_entries ;
num_input_records + = compact_ - > compaction - > input ( 0 , i ) - > num_entries ;
static_cast < uint64_t > ( compact_ - > compaction - > input ( 0 , i ) - > num_entries ) ;
}
for ( int i = 0 ; i < compact_ - > compaction - > num_input_files ( 1 ) ; i + + ) {
compaction_stats_ . bytes_readnp1 + =
compact_ - > compaction - > input ( 1 , i ) - > fd . GetFileSize ( ) ;
num_input_records + = compact_ - > compaction - > input ( 1 , i ) - > num_entries ;
}
for ( int i = 0 ; i < num_output_files ; i + + ) {
compaction_stats_ . bytes_written + = compact_ - > outputs [ i ] . file_size ;
}
compaction_stats_ . num_dropped_records =
static_cast < int > ( num_input_records ) - num_output_records ;
if ( compact_ - > num_input_records > compact_ - > num_output_records ) {
compaction_stats_ . num_dropped_records + =
compact_ - > num_input_records - compact_ - > num_output_records ;
compact_ - > num_input_records = compact_ - > num_output_records = 0 ;
}
RecordCompactionIOStats ( ) ;
@ -518,10 +519,7 @@ void CompactionJob::AllocateCompactionOutputFileNumbers() {
Status CompactionJob : : ProcessKeyValueCompaction ( int64_t * imm_micros ,
Iterator * input ,
bool is_compaction_v2 ,
int * num_output_records ) {
assert ( num_output_records ! = nullptr ) ;
bool is_compaction_v2 ) {
size_t combined_idx = 0 ;
Status status ;
std : : string compaction_filter_value ;
@ -553,6 +551,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
int64_t loop_cnt = 0 ;
while ( input - > Valid ( ) & & ! shutting_down_ - > load ( std : : memory_order_acquire ) & &
! cfd - > IsDropped ( ) & & status . ok ( ) ) {
compact_ - > num_input_records + + ;
if ( + + loop_cnt > 1000 ) {
if ( key_drop_user > 0 ) {
RecordTick ( stats_ , COMPACTION_KEY_DROP_USER , key_drop_user ) ;
@ -795,7 +794,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
}
compact_ - > current_output ( ) - > largest . DecodeFrom ( newkey ) ;
compact_ - > builder - > Add ( newkey , value ) ;
( * num_output_records ) + + ,
compact_ - > num_output_records + + ,
compact_ - > current_output ( ) - > largest_seqno =
std : : max ( compact_ - > current_output ( ) - > largest_seqno , seqno ) ;