@ -2159,147 +2159,247 @@ void VersionSet::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
}
}
Compaction * VersionSet : : PickCompactionUniversal ( int level , double score ) {
//
// Look at overall size amplification. If size amplification
// exceeeds the configured value, then do a compaction
// of the candidate files all the way upto the earliest
// base file (overrides configured values of file-size ratios,
// min_merge_width and max_merge_width).
//
Compaction * VersionSet : : PickCompactionUniversalSizeAmp (
int level , double score ) {
assert ( level = = 0 ) ;
// percentage flexibilty while comparing file sizes
uint64_t ratio = options_ - > compaction_options_universal . size_ratio ;
unsigned int min_merge_width =
options_ - > compaction_options_universal . min_merge_width ;
unsigned int max_merge_width =
options_ - > compaction_options_universal . max_merge_width ;
// percentage flexibilty while reducing size amplification
uint64_t ratio = options_ - > compaction_options_universal .
max_size_amplification_percent ;
if ( ( current_ - > files_ [ level ] . size ( ) < =
( unsigned int ) options_ - > level0_file_num_compaction_trigger ) ) {
Log ( options_ - > info_log , " Universal: nothing to do \n " ) ;
// The files are sorted from newest first to oldest last.
std : : vector < int > & file_by_time = current_ - > files_by_size_ [ level ] ;
assert ( file_by_time . size ( ) = = current_ - > files_ [ level ] . size ( ) ) ;
unsigned int candidate_count = 0 ;
uint64_t candidate_size = 0 ;
unsigned int start_index = 0 ;
FileMetaData * f = nullptr ;
// Skip files that are already being compacted
for ( unsigned int loop = 0 ; loop < file_by_time . size ( ) - 1 ; loop + + ) {
int index = file_by_time [ loop ] ;
f = current_ - > files_ [ level ] [ index ] ;
if ( ! f - > being_compacted ) {
start_index = loop ; // Consider this as the first candidate.
break ;
}
Log ( options_ - > info_log , " Universal: skipping file %ld[%d] compacted %s " ,
f - > number , loop , " cannot be a candidate to reduce size amp. \n " ) ;
f = nullptr ;
}
if ( f = = nullptr ) {
return nullptr ; // no candidate files
}
Log ( options_ - > info_log , " Universal: First candidate file %ld[%d] %s " ,
f - > number , start_index , " to reduce size amp. \n " ) ;
// keep adding up all the remaining files
for ( unsigned int loop = start_index ; loop < file_by_time . size ( ) - 1 ;
loop + + ) {
int index = file_by_time [ loop ] ;
f = current_ - > files_ [ level ] [ index ] ;
if ( f - > being_compacted ) {
Log ( options_ - > info_log ,
" Universal: Possible candidate file %ld[%d] %s. " , f - > number , loop ,
" is already being compacted. No size amp reduction possible. \n " ) ;
return nullptr ;
}
candidate_size + = f - > file_size ;
candidate_count + + ;
}
if ( candidate_count = = 0 ) {
return nullptr ;
}
VersionSet : : FileSummaryStorage tmp ;
Log ( options_ - > info_log , " Universal: candidate files(%lu): %s \n " ,
current_ - > files_ [ level ] . size ( ) ,
LevelFileSummary ( & tmp , 0 ) ) ;
Compaction * c = nullptr ;
c = new Compaction ( level , level , MaxFileSizeForLevel ( level ) ,
LLONG_MAX , NumberLevels ( ) ) ;
// size of earliest file
int index = file_by_time [ file_by_time . size ( ) - 1 ] ;
uint64_t earliest_file_size = current_ - > files_ [ level ] [ index ] - > file_size ;
// size amplification = percentage of additional size
if ( candidate_size * 100 < ratio * earliest_file_size ) {
Log ( options_ - > info_log ,
" Universal: size amp not needed. newer-files-total-size %ld "
" earliest-file-size %ld " ,
candidate_size , earliest_file_size ) ;
return nullptr ;
} else {
Log ( options_ - > info_log ,
" Universal: size amp needed. newer-files-total-size %ld "
" earliest-file-size %ld " ,
candidate_size , earliest_file_size ) ;
}
assert ( start_index > = 0 & & start_index < file_by_time . size ( ) - 1 ) ;
// create a compaction request
Compaction * c = new Compaction ( level , level , MaxFileSizeForLevel ( level ) ,
LLONG_MAX , NumberLevels ( ) ) ;
c - > score_ = score ;
for ( unsigned int loop = start_index ; loop < file_by_time . size ( ) ; loop + + ) {
int index = file_by_time [ loop ] ;
f = current_ - > files_ [ level ] [ index ] ;
c - > inputs_ [ 0 ] . push_back ( f ) ;
Log ( options_ - > info_log ,
" Universal: size amp picking file %ld[%d] with size %ld " ,
f - > number , index , f - > file_size ) ;
}
return c ;
}
//
// Consider compaction files based on their size differences with
// the next file in time order.
//
Compaction * VersionSet : : PickCompactionUniversalReadAmp (
int level , double score , unsigned int ratio ,
unsigned int max_number_of_files_to_compact ) {
unsigned int min_merge_width =
options_ - > compaction_options_universal . min_merge_width ;
unsigned int max_merge_width =
options_ - > compaction_options_universal . max_merge_width ;
// The files are sorted from newest first to oldest last.
std : : vector < int > & file_by_time = current_ - > files_by_size_ [ level ] ;
FileMetaData * f = nullptr ;
bool done = false ;
int start_index = 0 ;
unsigned int candidate_count ;
assert ( file_by_time . size ( ) = = current_ - > files_ [ level ] . size ( ) ) ;
unsigned int max_files_to_compact = std : : min ( max_merge_width , UINT_MAX ) ;
unsigned int max_files_to_compact = std : : min ( max_merge_width ,
max_number_of_files_to_compact ) ;
min_merge_width = std : : max ( min_merge_width , 2U ) ;
// Make two pass. The first pass considers a candidate file
// only if it is smaller than the total size accumulated so far.
// The second pass does not look at the slope of the
// file-size curve to decide what to pick for compaction.
for ( int iter = 0 ; ! done & & iter < 2 ; iter + + ) {
// Considers a candidate file only if it is smaller than the
// total size accumulated so far.
for ( unsigned int loop = 0 ; loop < file_by_time . size ( ) ; loop + + ) {
for ( unsigned int loop = 0 ; loop < file_by_time . size ( ) ; ) {
candidate_count = 0 ;
// Skip files that are already being compacted
for ( f = nullptr ; loop < file_by_time . size ( ) ; loop + + ) {
int index = file_by_time [ loop ] ;
f = current_ - > files_ [ level ] [ index ] ;
// Skip files that are already being compacted
for ( f = nullptr ; loop < file_by_time . size ( ) ; loop + + ) {
int index = file_by_time [ loop ] ;
f = current_ - > files_ [ level ] [ index ] ;
if ( ! f - > being_compacted ) {
break ;
}
Log ( options_ - > info_log , " Universal: file %ld[%d] being compacted, skipping " ,
f - > number , loop ) ;
f = nullptr ;
}
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
unsigned int candidate_count = 1 ;
uint64_t candidate_size = f ! = nullptr ? f - > file_size : 0 ;
if ( f ! = nullptr ) {
Log ( options_ - > info_log , " Universal: Possible candidate file %ld[%d] %s. " ,
f - > number , loop , iter = = 0 ? " " : " forced " ) ;
}
// Check if the suceeding files need compaction.
for ( unsigned int i = loop + 1 ;
candidate_count < max_files_to_compact & & i < file_by_time . size ( ) ;
i + + ) {
int index = file_by_time [ i ] ;
FileMetaData * f = current_ - > files_ [ level ] [ index ] ;
if ( f - > being_compacted ) {
break ;
}
// If this is the first iteration, then we pick files if the
// total candidate file size (increased by the specified ratio)
// is still larger than the next candidate file.
if ( iter = = 0 ) {
uint64_t sz = ( candidate_size * ( 100 + ratio ) ) / 100 ;
if ( sz < f - > file_size ) {
break ;
}
}
candidate_count + + ;
candidate_size + = f - > file_size ;
}
// Found a series of consecutive files that need compaction.
if ( candidate_count > = ( unsigned int ) min_merge_width ) {
for ( unsigned int i = loop ; i < loop + candidate_count ; i + + ) {
int index = file_by_time [ i ] ;
FileMetaData * f = current_ - > files_ [ level ] [ index ] ;
c - > inputs_ [ 0 ] . push_back ( f ) ;
Log ( options_ - > info_log , " Universal: Picking file %ld[%d] with size %ld %s " ,
f - > number , i , f - > file_size ,
( iter = = 0 ? " " : " forced " ) ) ;
}
done = true ;
if ( ! f - > being_compacted ) {
candidate_count = 1 ;
break ;
} else {
for ( unsigned int i = loop ;
i < loop + candidate_count & & i < file_by_time . size ( ) ; i + + ) {
int index = file_by_time [ i ] ;
FileMetaData * f = current_ - > files_ [ level ] [ index ] ;
Log ( options_ - > info_log , " Universal: Skipping file %ld[%d] with size %ld %d %s " ,
f - > number , i , f - > file_size , f - > being_compacted ,
( iter = = 0 ? " " : " forced " ) ) ;
}
}
loop + = candidate_count ;
Log ( options_ - > info_log ,
" Universal: file %ld[%d] being compacted, skipping " ,
f - > number , loop ) ;
f = nullptr ;
}
assert ( done | | c - > inputs_ [ 0 ] . size ( ) = = 0 ) ;
// If we are unable to find a normal compaction run and we are still
// above the compaction threshold, iterate again to pick compaction
// candidates, this time without considering their size differences.
if ( ! done ) {
int files_not_in_compaction = 0 ;
for ( unsigned int i = 0 ; i < current_ - > files_ [ level ] . size ( ) ; i + + ) {
f = current_ - > files_ [ level ] [ i ] ;
if ( ! f - > being_compacted ) {
files_not_in_compaction + + ;
}
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = f ! = nullptr ? f - > file_size : 0 ;
if ( f ! = nullptr ) {
Log ( options_ - > info_log , " Universal: Possible candidate file %ld[%d]. " ,
f - > number , loop ) ;
}
// Check if the suceeding files need compaction.
for ( unsigned int i = loop + 1 ;
candidate_count < max_files_to_compact & & i < file_by_time . size ( ) ;
i + + ) {
int index = file_by_time [ i ] ;
FileMetaData * f = current_ - > files_ [ level ] [ index ] ;
if ( f - > being_compacted ) {
break ;
}
int expected_num_files = files_not_in_compaction +
compactions_in_progress_ [ level ] . size ( ) ;
if ( expected_num_files < =
options_ - > level0_file_num_compaction_trigger + 1 ) {
done = true ; // nothing more to do
} else {
max_files_to_compact = std : : min ( ( int ) max_merge_width ,
expected_num_files - options_ - > level0_file_num_compaction_trigger ) ;
Log ( options_ - > info_log , " Universal: second loop with maxfiles %d " ,
max_files_to_compact ) ;
// pick files if the total candidate file size (increased by the
// specified ratio) is still larger than the next candidate file.
uint64_t sz = ( candidate_size * ( 100L + ratio ) ) / 100 ;
if ( sz < f - > file_size ) {
break ;
}
candidate_count + + ;
candidate_size + = f - > file_size ;
}
// Found a series of consecutive files that need compaction.
if ( candidate_count > = ( unsigned int ) min_merge_width ) {
start_index = loop ;
done = true ;
break ;
} else {
for ( unsigned int i = loop ;
i < loop + candidate_count & & i < file_by_time . size ( ) ; i + + ) {
int index = file_by_time [ i ] ;
FileMetaData * f = current_ - > files_ [ level ] [ index ] ;
Log ( options_ - > info_log ,
" Universal: Skipping file %ld[%d] with size %ld %d \n " ,
f - > number , i , f - > file_size , f - > being_compacted ) ;
}
}
}
if ( c - > inputs_ [ 0 ] . size ( ) < = 1 ) {
Log ( options_ - > info_log , " Universal: only %ld files, nothing to do. \n " ,
c - > inputs_ [ 0 ] . size ( ) ) ;
delete c ;
if ( ! done | | candidate_count < = 1 ) {
return nullptr ;
}
Compaction * c = new Compaction ( level , level , MaxFileSizeForLevel ( level ) ,
LLONG_MAX , NumberLevels ( ) ) ;
c - > score_ = score ;
for ( unsigned int i = start_index ; i < start_index + candidate_count ; i + + ) {
int index = file_by_time [ i ] ;
FileMetaData * f = current_ - > files_ [ level ] [ index ] ;
c - > inputs_ [ 0 ] . push_back ( f ) ;
Log ( options_ - > info_log , " Universal: Picking file %ld[%d] with size %ld \n " ,
f - > number , i , f - > file_size ) ;
}
return c ;
}
//
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction * VersionSet : : PickCompactionUniversal ( int level , double score ) {
assert ( level = = 0 ) ;
if ( ( current_ - > files_ [ level ] . size ( ) < =
( unsigned int ) options_ - > level0_file_num_compaction_trigger ) ) {
Log ( options_ - > info_log , " Universal: nothing to do \n " ) ;
return nullptr ;
}
VersionSet : : FileSummaryStorage tmp ;
Log ( options_ - > info_log , " Universal: candidate files(%lu): %s \n " ,
current_ - > files_ [ level ] . size ( ) ,
LevelFileSummary ( & tmp , 0 ) ) ;
// Check for size amplification first.
Compaction * c = PickCompactionUniversalSizeAmp ( level , score ) ;
if ( c = = nullptr ) {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_ - > compaction_options_universal . size_ratio ;
c = PickCompactionUniversalReadAmp ( level , score , ratio , UINT_MAX ) ;
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
if ( c = = nullptr ) {
unsigned int num_files = current_ - > files_ [ level ] . size ( ) -
options_ - > level0_file_num_compaction_trigger ;
c = PickCompactionUniversalReadAmp ( level , score , UINT_MAX , num_files ) ;
}
}
if ( c = = nullptr ) {
return nullptr ;
}
assert ( c - > inputs_ [ 0 ] . size ( ) > 1 ) ;
// validate that all the chosen files are non overlapping in time
FileMetaData * newerfile __attribute__ ( ( unused ) ) = nullptr ;
@ -2311,6 +2411,9 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) {
newerfile = f ;
}
// The files are sorted from newest first to oldest last.
std : : vector < int > & file_by_time = current_ - > files_by_size_ [ level ] ;
// Is the earliest file part of this compaction?
int last_index = file_by_time [ file_by_time . size ( ) - 1 ] ;
FileMetaData * last_file = current_ - > files_ [ level ] [ last_index ] ;