@ -25,6 +25,98 @@
namespace rocksdb {
namespace {
// A helper class that form universal compactions. The class is used by
// UniversalCompactionPicker::PickCompaction().
// The usage is to create the class, and get the compaction object by calling
// PickCompaction().
class UniversalCompactionBuilder {
public :
UniversalCompactionBuilder ( const ImmutableCFOptions & ioptions ,
const InternalKeyComparator * icmp ,
const std : : string & cf_name ,
const MutableCFOptions & mutable_cf_options ,
VersionStorageInfo * vstorage ,
UniversalCompactionPicker * picker ,
LogBuffer * log_buffer )
: ioptions_ ( ioptions ) ,
icmp_ ( icmp ) ,
cf_name_ ( cf_name ) ,
mutable_cf_options_ ( mutable_cf_options ) ,
vstorage_ ( vstorage ) ,
picker_ ( picker ) ,
log_buffer_ ( log_buffer ) { }
// Form and return the compaction object. The caller owns return object.
Compaction * PickCompaction ( ) ;
private :
struct SortedRun {
SortedRun ( int _level , FileMetaData * _file , uint64_t _size ,
uint64_t _compensated_file_size , bool _being_compacted )
: level ( _level ) ,
file ( _file ) ,
size ( _size ) ,
compensated_file_size ( _compensated_file_size ) ,
being_compacted ( _being_compacted ) {
assert ( compensated_file_size > 0 ) ;
assert ( level ! = 0 | | file ! = nullptr ) ;
}
void Dump ( char * out_buf , size_t out_buf_size ,
bool print_path = false ) const ;
// sorted_run_count is added into the string to print
void DumpSizeInfo ( char * out_buf , size_t out_buf_size ,
size_t sorted_run_count ) const ;
int level ;
// `file` Will be null for level > 0. For level = 0, the sorted run is
// for this file.
FileMetaData * file ;
// For level > 0, `size` and `compensated_file_size` are sum of sizes all
// files in the level. `being_compacted` should be the same for all files
// in a non-zero level. Use the value here.
uint64_t size ;
uint64_t compensated_file_size ;
bool being_compacted ;
} ;
// Pick Universal compaction to limit read amplification
Compaction * PickCompactionToReduceSortedRuns (
unsigned int ratio , unsigned int max_number_of_files_to_compact ) ;
// Pick Universal compaction to limit space amplification.
Compaction * PickCompactionToReduceSizeAmp ( ) ;
Compaction * PickDeleteTriggeredCompaction ( ) ;
// Used in universal compaction when the enabled_trivial_move
// option is set. Checks whether there are any overlapping files
// in the input. Returns true if the input files are non
// overlapping.
bool IsInputFilesNonOverlapping ( Compaction * c ) ;
const ImmutableCFOptions & ioptions_ ;
const InternalKeyComparator * icmp_ ;
double score_ ;
std : : vector < SortedRun > sorted_runs_ ;
const std : : string & cf_name_ ;
const MutableCFOptions & mutable_cf_options_ ;
VersionStorageInfo * vstorage_ ;
UniversalCompactionPicker * picker_ ;
LogBuffer * log_buffer_ ;
static std : : vector < SortedRun > CalculateSortedRuns (
const VersionStorageInfo & vstorage , const ImmutableCFOptions & ioptions ,
const MutableCFOptions & mutable_cf_options ) ;
// Pick a path ID to place a newly generated file, with its estimated file
// size.
static uint32_t GetPathId ( const ImmutableCFOptions & ioptions ,
const MutableCFOptions & mutable_cf_options ,
uint64_t file_size ) ;
} ;
// Used in universal compaction when trivial move is enabled.
// This structure is used for the construction of min heap
// that contains the file meta data, the level of the file
@ -113,7 +205,7 @@ void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
// Algorithm that checks to see if there are any overlapping
// files in the input
bool UniversalCompactionPick er : : IsInputFilesNonOverlapping ( Compaction * c ) {
bool UniversalCompactionBuild er : : IsInputFilesNonOverlapping ( Compaction * c ) {
auto comparator = icmp_ - > user_comparator ( ) ;
int first_iter = 1 ;
@ -167,9 +259,18 @@ bool UniversalCompactionPicker::NeedsCompaction(
return false ;
}
void UniversalCompactionPicker : : SortedRun : : Dump ( char * out_buf ,
size_t out_buf_size ,
bool print_path ) const {
Compaction * UniversalCompactionPicker : : PickCompaction (
const std : : string & cf_name , const MutableCFOptions & mutable_cf_options ,
VersionStorageInfo * vstorage , LogBuffer * log_buffer ) {
UniversalCompactionBuilder builder ( ioptions_ , icmp_ , cf_name ,
mutable_cf_options , vstorage , this ,
log_buffer ) ;
return builder . PickCompaction ( ) ;
}
void UniversalCompactionBuilder : : SortedRun : : Dump ( char * out_buf ,
size_t out_buf_size ,
bool print_path ) const {
if ( level = = 0 ) {
assert ( file ! = nullptr ) ;
if ( file - > fd . GetPathId ( ) = = 0 | | ! print_path ) {
@ -185,7 +286,7 @@ void UniversalCompactionPicker::SortedRun::Dump(char* out_buf,
}
}
void UniversalCompactionPick er : : SortedRun : : DumpSizeInfo (
void UniversalCompactionBuild er : : SortedRun : : DumpSizeInfo (
char * out_buf , size_t out_buf_size , size_t sorted_run_count ) const {
if ( level = = 0 ) {
assert ( file ! = nullptr ) ;
@ -204,11 +305,11 @@ void UniversalCompactionPicker::SortedRun::DumpSizeInfo(
}
}
std : : vector < UniversalCompactionPick er : : SortedRun >
UniversalCompactionPick er : : CalculateSortedRuns (
std : : vector < UniversalCompactionBuild er : : SortedRun >
UniversalCompactionBuild er : : CalculateSortedRuns (
const VersionStorageInfo & vstorage , const ImmutableCFOptions & /*ioptions*/ ,
const MutableCFOptions & mutable_cf_options ) {
std : : vector < UniversalCompactionPick er : : SortedRun > ret ;
std : : vector < UniversalCompactionBuild er : : SortedRun > ret ;
for ( FileMetaData * f : vstorage . LevelFiles ( 0 ) ) {
ret . emplace_back ( 0 , f , f - > fd . GetFileSize ( ) , f - > compensated_file_size ,
f - > being_compacted ) ;
@ -231,8 +332,8 @@ UniversalCompactionPicker::CalculateSortedRuns(
// non-zero level, all the files should share the same being_compacted
// value.
// This assumption is only valid when
// mutable_cf_options.compaction_options_universal.allow_trivial_move is
// false
// mutable_cf_options.compaction_options_universal.allow_trivial_move
// is false
assert ( is_first | | f - > being_compacted = = being_compacted ) ;
}
if ( is_first ) {
@ -250,65 +351,59 @@ UniversalCompactionPicker::CalculateSortedRuns(
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
Compaction * UniversalCompactionPicker : : PickCompaction (
const std : : string & cf_name , const MutableCFOptions & mutable_cf_options ,
VersionStorageInfo * vstorage , LogBuffer * log_buffer ) {
Compaction * UniversalCompactionBuilder : : PickCompaction ( ) {
const int kLevel0 = 0 ;
double score = vstorage - > CompactionScore ( kLevel0 ) ;
std : : vector < SortedRun > s orted_runs =
CalculateSortedRuns ( * vstorage , ioptions_ , mutable_cf_options ) ;
if ( sorted_runs . size ( ) = = 0 | |
( vstorage - > FilesMarkedForCompaction ( ) . empty ( ) & &
sorted_runs . size ( ) < ( unsigned int ) mutable_cf_options
. level0_file_num_compaction_trigger ) ) {
ROCKS_LOG_BUFFER ( log_buffer , " [%s] Universal: nothing to do \n " ,
cf_name . c_str ( ) ) ;
TEST_SYNC_POINT_CALLBACK ( " UniversalCompactionPicker::PickCompaction:Return " ,
nullptr ) ;
score_ = vstorage_ - > CompactionScore ( kLevel0 ) ;
sorted_runs_ =
CalculateSortedRuns ( * vstorage_ , ioptions_ , mutable_cf_options_ ) ;
if ( sorted_runs_ . size ( ) = = 0 | |
( vstorage_ - > FilesMarkedForCompaction ( ) . empty ( ) & &
sorted_runs_ . size ( ) < ( unsigned int ) mutable_cf_options_
. level0_file_num_compaction_trigger ) ) {
ROCKS_LOG_BUFFER ( log_buffer_ , " [%s] Universal: nothing to do \n " ,
cf_name_ . c_str ( ) ) ;
TEST_SYNC_POINT_CALLBACK (
" UniversalCompactionBuilder::PickCompaction:Return " , nullptr ) ;
return nullptr ;
}
VersionStorageInfo : : LevelSummaryStorage tmp ;
ROCKS_LOG_BUFFER_MAX_SZ (
log_buffer , 3072 ,
log_buffer_ , 3072 ,
" [%s] Universal: sorted runs files(% " ROCKSDB_PRIszt " ): %s \n " ,
cf_name . c_str ( ) , sorted_runs . size ( ) , vstorage - > LevelSummary ( & tmp ) ) ;
cf_name_ . c_str ( ) , sorted_runs_ . size ( ) , vstorage_ - > LevelSummary ( & tmp ) ) ;
// Check for size amplification first.
Compaction * c = nullptr ;
if ( sorted_runs . size ( ) > =
if ( sorted_runs_ . size ( ) > =
static_cast < size_t > (
mutable_cf_options . level0_file_num_compaction_trigger ) ) {
if ( ( c = PickCompactionToReduceSizeAmp ( cf_name , mutable_cf_options ,
vstorage , score , sorted_runs ,
log_buffer ) ) ! = nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer , " [%s] Universal: compacting for size amp \n " ,
cf_name . c_str ( ) ) ;
mutable_cf_options_ . level0_file_num_compaction_trigger ) ) {
if ( ( c = PickCompactionToReduceSizeAmp ( ) ) ! = nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer_ , " [%s] Universal: compacting for size amp \n " ,
cf_name_ . c_str ( ) ) ;
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio =
mutable_cf_options . compaction_options_universal . size_ratio ;
mutable_cf_options_ . compaction_options_universal . size_ratio ;
if ( ( c = PickCompactionToReduceSortedRuns (
cf_name , mutable_cf_options , vstorage , score , ratio , UINT_MAX ,
sorted_runs , log_buffer ) ) ! = nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer ,
if ( ( c = PickCompactionToReduceSortedRuns ( ratio , UINT_MAX ) ) ! = nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer_ ,
" [%s] Universal: compacting for size ratio \n " ,
cf_name . c_str ( ) ) ;
cf_name_ . c_str ( ) ) ;
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
// This is guaranteed by NeedsCompaction()
assert ( sorted_runs . size ( ) > =
assert ( sorted_runs_ . size ( ) > =
static_cast < size_t > (
mutable_cf_options . level0_file_num_compaction_trigger ) ) ;
mutable_cf_options_ . level0_file_num_compaction_trigger ) ) ;
// Get the total number of sorted runs that are not being compacted
int num_sr_not_compacted = 0 ;
for ( size_t i = 0 ; i < sorted_runs . size ( ) ; i + + ) {
if ( sorted_runs [ i ] . being_compacted = = false ) {
for ( size_t i = 0 ; i < sorted_runs_ . size ( ) ; i + + ) {
if ( sorted_runs_ [ i ] . being_compacted = = false ) {
num_sr_not_compacted + + ;
}
}
@ -316,16 +411,15 @@ Compaction* UniversalCompactionPicker::PickCompaction(
// The number of sorted runs that are not being compacted is greater
// than the maximum allowed number of sorted runs
if ( num_sr_not_compacted >
mutable_cf_options . level0_file_num_compaction_trigger ) {
mutable_cf_options_ . level0_file_num_compaction_trigger ) {
unsigned int num_files =
num_sr_not_compacted -
mutable_cf_options . level0_file_num_compaction_trigger + 1 ;
if ( ( c = PickCompactionToReduceSortedRuns (
cf_name , mutable_cf_options , vstorage , score , UINT_MAX ,
num_files , sorted_runs , log_buffer ) ) ! = nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer ,
mutable_cf_options_ . level0_file_num_compaction_trigger + 1 ;
if ( ( c = PickCompactionToReduceSortedRuns ( UINT_MAX , num_files ) ) ! =
nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer_ ,
" [%s] Universal: compacting for file num -- %u \n " ,
cf_name . c_str ( ) , num_files ) ;
cf_name_ . c_str ( ) , num_files ) ;
}
}
}
@ -333,22 +427,20 @@ Compaction* UniversalCompactionPicker::PickCompaction(
}
if ( c = = nullptr ) {
if ( ( c = PickDeleteTriggeredCompaction ( cf_name , mutable_cf_options ,
vstorage , score , sorted_runs ,
log_buffer ) ) ! = nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer ,
if ( ( c = PickDeleteTriggeredCompaction ( ) ) ! = nullptr ) {
ROCKS_LOG_BUFFER ( log_buffer_ ,
" [%s] Universal: delete triggered compaction \n " ,
cf_name . c_str ( ) ) ;
cf_name_ . c_str ( ) ) ;
}
}
if ( c = = nullptr ) {
TEST_SYNC_POINT_CALLBACK ( " UniversalCompactionPicker::PickCompaction:Return " ,
nullptr ) ;
TEST_SYNC_POINT_CALLBACK (
" UniversalCompactionBuilder::PickCompaction:Return " , nullptr ) ;
return nullptr ;
}
if ( mutable_cf_options . compaction_options_universal . allow_trivial_move = =
if ( mutable_cf_options_ . compaction_options_universal . allow_trivial_move = =
true ) {
c - > set_is_trivial_move ( IsInputFilesNonOverlapping ( c ) ) ;
}
@ -394,15 +486,15 @@ Compaction* UniversalCompactionPicker::PickCompaction(
RecordInHistogram ( ioptions_ . statistics , NUM_FILES_IN_SINGLE_COMPACTION ,
c - > inputs ( 0 ) - > size ( ) ) ;
RegisterCompaction ( c ) ;
vstorage - > ComputeCompactionScore ( ioptions_ , mutable_cf_options ) ;
picker_ - > RegisterCompaction ( c ) ;
vstorage_ - > ComputeCompactionScore ( ioptions_ , mutable_cf_options_ ) ;
TEST_SYNC_POINT_CALLBACK ( " UniversalCompactionPick er::PickCompaction:Return " ,
TEST_SYNC_POINT_CALLBACK ( " UniversalCompactionBuild er::PickCompaction:Return " ,
c ) ;
return c ;
}
uint32_t UniversalCompactionPick er : : GetPathId (
uint32_t UniversalCompactionBuild er : : GetPathId (
const ImmutableCFOptions & ioptions ,
const MutableCFOptions & mutable_cf_options , uint64_t file_size ) {
// Two conditions need to be satisfied:
@ -440,15 +532,12 @@ uint32_t UniversalCompactionPicker::GetPathId(
// Consider compaction files based on their size differences with
// the next file in time order.
//
Compaction * UniversalCompactionPicker : : PickCompactionToReduceSortedRuns (
const std : : string & cf_name , const MutableCFOptions & mutable_cf_options ,
VersionStorageInfo * vstorage , double score , unsigned int ratio ,
unsigned int max_number_of_files_to_compact ,
const std : : vector < SortedRun > & sorted_runs , LogBuffer * log_buffer ) {
Compaction * UniversalCompactionBuilder : : PickCompactionToReduceSortedRuns (
unsigned int ratio , unsigned int max_number_of_files_to_compact ) {
unsigned int min_merge_width =
mutable_cf_options . compaction_options_universal . min_merge_width ;
mutable_cf_options_ . compaction_options_universal . min_merge_width ;
unsigned int max_merge_width =
mutable_cf_options . compaction_options_universal . max_merge_width ;
mutable_cf_options_ . compaction_options_universal . max_merge_width ;
const SortedRun * sr = nullptr ;
bool done = false ;
@ -462,16 +551,16 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
// Caller checks the size before executing this function. This invariant is
// important because otherwise we may have a possible integer underflow when
// dealing with unsigned types.
assert ( sorted_runs . size ( ) > 0 ) ;
assert ( sorted_runs_ . size ( ) > 0 ) ;
// Considers a candidate file only if it is smaller than the
// total size accumulated so far.
for ( size_t loop = 0 ; loop < sorted_runs . size ( ) ; loop + + ) {
for ( size_t loop = 0 ; loop < sorted_runs_ . size ( ) ; loop + + ) {
candidate_count = 0 ;
// Skip files that are already being compacted
for ( sr = nullptr ; loop < sorted_runs . size ( ) ; loop + + ) {
sr = & sorted_runs [ loop ] ;
for ( sr = nullptr ; loop < sorted_runs_ . size ( ) ; loop + + ) {
sr = & sorted_runs_ [ loop ] ;
if ( ! sr - > being_compacted ) {
candidate_count = 1 ;
@ -479,10 +568,10 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
}
char file_num_buf [ kFormatFileNumberBufSize ] ;
sr - > Dump ( file_num_buf , sizeof ( file_num_buf ) ) ;
ROCKS_LOG_BUFFER ( log_buffer ,
ROCKS_LOG_BUFFER ( log_buffer_ ,
" [%s] Universal: %s "
" [%d] being compacted, skipping " ,
cf_name . c_str ( ) , file_num_buf , loop ) ;
cf_name_ . c_str ( ) , file_num_buf , loop ) ;
sr = nullptr ;
}
@ -493,15 +582,16 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
if ( sr ! = nullptr ) {
char file_num_buf [ kFormatFileNumberBufSize ] ;
sr - > Dump ( file_num_buf , sizeof ( file_num_buf ) , true ) ;
ROCKS_LOG_BUFFER ( log_buffer , " [%s] Universal: Possible candidate %s[%d]. " ,
cf_name . c_str ( ) , file_num_buf , loop ) ;
ROCKS_LOG_BUFFER ( log_buffer_ ,
" [%s] Universal: Possible candidate %s[%d]. " ,
cf_name_ . c_str ( ) , file_num_buf , loop ) ;
}
// Check if the succeeding files need compaction.
for ( size_t i = loop + 1 ;
candidate_count < max_files_to_compact & & i < sorted_runs . size ( ) ;
candidate_count < max_files_to_compact & & i < sorted_runs_ . size ( ) ;
i + + ) {
const SortedRun * succeeding_sr = & sorted_runs [ i ] ;
const SortedRun * succeeding_sr = & sorted_runs_ [ i ] ;
if ( succeeding_sr - > being_compacted ) {
break ;
}
@ -515,7 +605,7 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
if ( sz < static_cast < double > ( succeeding_sr - > size ) ) {
break ;
}
if ( mutable_cf_options . compaction_options_universal . stop_style = =
if ( mutable_cf_options_ . compaction_options_universal . stop_style = =
kCompactionStopStyleSimilarSize ) {
// Similar-size stopping rule: also check the last picked file isn't
// far larger than the next candidate file.
@ -541,12 +631,12 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
break ;
} else {
for ( size_t i = loop ;
i < loop + candidate_count & & i < sorted_runs . size ( ) ; i + + ) {
const SortedRun * skipping_sr = & sorted_runs [ i ] ;
i < loop + candidate_count & & i < sorted_runs_ . size ( ) ; i + + ) {
const SortedRun * skipping_sr = & sorted_runs_ [ i ] ;
char file_num_buf [ 256 ] ;
skipping_sr - > DumpSizeInfo ( file_num_buf , sizeof ( file_num_buf ) , loop ) ;
ROCKS_LOG_BUFFER ( log_buffer , " [%s] Universal: Skipping %s " ,
cf_name . c_str ( ) , file_num_buf ) ;
ROCKS_LOG_BUFFER ( log_buffer_ , " [%s] Universal: Skipping %s " ,
cf_name_ . c_str ( ) , file_num_buf ) ;
}
}
}
@ -558,16 +648,16 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
// size ratio of compression.
bool enable_compression = true ;
int ratio_to_compress =
mutable_cf_options . compaction_options_universal . compression_size_percent ;
mutable_cf_options_ . compaction_options_universal . compression_size_percent ;
if ( ratio_to_compress > = 0 ) {
uint64_t total_size = 0 ;
for ( auto & sorted_run : sorted_runs ) {
for ( auto & sorted_run : sorted_runs_ ) {
total_size + = sorted_run . compensated_file_size ;
}
uint64_t older_file_size = 0 ;
for ( size_t i = sorted_runs . size ( ) - 1 ; i > = first_index_after ; i - - ) {
older_file_size + = sorted_runs [ i ] . size ;
for ( size_t i = sorted_runs_ . size ( ) - 1 ; i > = first_index_after ; i - - ) {
older_file_size + = sorted_runs_ [ i ] . size ;
if ( older_file_size * 100L > = total_size * ( long ) ratio_to_compress ) {
enable_compression = false ;
break ;
@ -577,46 +667,46 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
uint64_t estimated_total_size = 0 ;
for ( unsigned int i = 0 ; i < first_index_after ; i + + ) {
estimated_total_size + = sorted_runs [ i ] . size ;
estimated_total_size + = sorted_runs_ [ i ] . size ;
}
uint32_t path_id =
GetPathId ( ioptions_ , mutable_cf_options , estimated_total_size ) ;
int start_level = sorted_runs [ start_index ] . level ;
GetPathId ( ioptions_ , mutable_cf_options_ , estimated_total_size ) ;
int start_level = sorted_runs_ [ start_index ] . level ;
int output_level ;
if ( first_index_after = = sorted_runs . size ( ) ) {
output_level = vstorage - > num_levels ( ) - 1 ;
} else if ( sorted_runs [ first_index_after ] . level = = 0 ) {
if ( first_index_after = = sorted_runs_ . size ( ) ) {
output_level = vstorage_ - > num_levels ( ) - 1 ;
} else if ( sorted_runs_ [ first_index_after ] . level = = 0 ) {
output_level = 0 ;
} else {
output_level = sorted_runs [ first_index_after ] . level - 1 ;
output_level = sorted_runs_ [ first_index_after ] . level - 1 ;
}
// last level is reserved for the files ingested behind
if ( ioptions_ . allow_ingest_behind & &
( output_level = = vstorage - > num_levels ( ) - 1 ) ) {
( output_level = = vstorage_ - > num_levels ( ) - 1 ) ) {
assert ( output_level > 1 ) ;
output_level - - ;
}
std : : vector < CompactionInputFiles > inputs ( vstorage - > num_levels ( ) ) ;
std : : vector < CompactionInputFiles > inputs ( vstorage_ - > num_levels ( ) ) ;
for ( size_t i = 0 ; i < inputs . size ( ) ; + + i ) {
inputs [ i ] . level = start_level + static_cast < int > ( i ) ;
}
for ( size_t i = start_index ; i < first_index_after ; i + + ) {
auto & picking_sr = sorted_runs [ i ] ;
auto & picking_sr = sorted_runs_ [ i ] ;
if ( picking_sr . level = = 0 ) {
FileMetaData * picking_file = picking_sr . file ;
inputs [ 0 ] . files . push_back ( picking_file ) ;
} else {
auto & files = inputs [ picking_sr . level - start_level ] . files ;
for ( auto * f : vstorage - > LevelFiles ( picking_sr . level ) ) {
for ( auto * f : vstorage_ - > LevelFiles ( picking_sr . level ) ) {
files . push_back ( f ) ;
}
}
char file_num_buf [ 256 ] ;
picking_sr . DumpSizeInfo ( file_num_buf , sizeof ( file_num_buf ) , i ) ;
ROCKS_LOG_BUFFER ( log_buffer , " [%s] Universal: Picking %s " , cf_name . c_str ( ) ,
file_num_buf ) ;
ROCKS_LOG_BUFFER ( log_buffer_ , " [%s] Universal: Picking %s " ,
cf_name_ . c_str ( ) , file_num_buf ) ;
}
CompactionReason compaction_reason ;
@ -626,16 +716,17 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
compaction_reason = CompactionReason : : kUniversalSortedRunNum ;
}
return new Compaction (
vstorage , ioptions_ , mutable_cf_options , std : : move ( inputs ) , output_level ,
MaxFileSizeForLevel ( mutable_cf_options , output_level ,
vstorage_ , ioptions_ , mutable_cf_options_ , std : : move ( inputs ) ,
output_level ,
MaxFileSizeForLevel ( mutable_cf_options_ , output_level ,
kCompactionStyleUniversal ) ,
LLONG_MAX , path_id ,
GetCompressionType ( ioptions_ , vstorage , mutable_cf_options , start_level ,
GetCompressionType ( ioptions_ , vstorage_ , mutable_cf_options_ , start_level ,
1 , enable_compression ) ,
GetCompressionOptions ( ioptions_ , vstorage , start_level ,
GetCompressionOptions ( ioptions_ , vstorage_ , start_level ,
enable_compression ) ,
/* max_subcompactions */ 0 , /* grandparents */ { } , /* is manual */ false ,
score , false /* deletion_compaction */ , compaction_reason ) ;
score_ , false /* deletion_compaction */ , compaction_reason ) ;
}
// Look at overall size amplification. If size amplification
@ -644,12 +735,9 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
// base file (overrides configured values of file-size ratios,
// min_merge_width and max_merge_width).
//
Compaction * UniversalCompactionPicker : : PickCompactionToReduceSizeAmp (
const std : : string & cf_name , const MutableCFOptions & mutable_cf_options ,
VersionStorageInfo * vstorage , double score ,
const std : : vector < SortedRun > & sorted_runs , LogBuffer * log_buffer ) {
Compaction * UniversalCompactionBuilder : : PickCompactionToReduceSizeAmp ( ) {
// percentage flexibility while reducing size amplification
uint64_t ratio = mutable_cf_options . compaction_options_universal
uint64_t ratio = mutable_cf_options_ . compaction_options_universal
. max_size_amplification_percent ;
unsigned int candidate_count = 0 ;
@ -657,21 +745,23 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
size_t start_index = 0 ;
const SortedRun * sr = nullptr ;
if ( sorted_runs . back ( ) . being_compacted ) {
assert ( ! sorted_runs_ . empty ( ) ) ;
if ( sorted_runs_ . back ( ) . being_compacted ) {
return nullptr ;
}
// Skip files that are already being compacted
for ( size_t loop = 0 ; loop < sorted_runs . size ( ) - 1 ; loop + + ) {
sr = & sorted_runs [ loop ] ;
for ( size_t loop = 0 ; loop < sorted_runs_ . size ( ) - 1 ; loop + + ) {
sr = & sorted_runs_ [ loop ] ;
if ( ! sr - > being_compacted ) {
start_index = loop ; // Consider this as the first candidate.
break ;
}
char file_num_buf [ kFormatFileNumberBufSize ] ;
sr - > Dump ( file_num_buf , sizeof ( file_num_buf ) , true ) ;
ROCKS_LOG_BUFFER ( log_buffer , " [%s] Universal: skipping %s[%d] compacted %s " ,
cf_name . c_str ( ) , file_num_buf , loop ,
ROCKS_LOG_BUFFER ( log_buffer_ ,
" [%s] Universal: skipping %s[%d] compacted %s " ,
cf_name_ . c_str ( ) , file_num_buf , loop ,
" cannot be a candidate to reduce size amp. \n " ) ;
sr = nullptr ;
}
@ -683,20 +773,20 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
char file_num_buf [ kFormatFileNumberBufSize ] ;
sr - > Dump ( file_num_buf , sizeof ( file_num_buf ) , true ) ;
ROCKS_LOG_BUFFER (
log_buffer ,
log_buffer_ ,
" [%s] Universal: First candidate %s[% " ROCKSDB_PRIszt " ] %s " ,
cf_name . c_str ( ) , file_num_buf , start_index , " to reduce size amp. \n " ) ;
cf_name_ . c_str ( ) , file_num_buf , start_index , " to reduce size amp. \n " ) ;
}
// keep adding up all the remaining files
for ( size_t loop = start_index ; loop < sorted_runs . size ( ) - 1 ; loop + + ) {
sr = & sorted_runs [ loop ] ;
for ( size_t loop = start_index ; loop < sorted_runs_ . size ( ) - 1 ; loop + + ) {
sr = & sorted_runs_ [ loop ] ;
if ( sr - > being_compacted ) {
char file_num_buf [ kFormatFileNumberBufSize ] ;
sr - > Dump ( file_num_buf , sizeof ( file_num_buf ) , true ) ;
ROCKS_LOG_BUFFER (
log_buffer , " [%s] Universal: Possible candidate %s[%d] %s " ,
cf_name . c_str ( ) , file_num_buf , start_index ,
log_buffer_ , " [%s] Universal: Possible candidate %s[%d] %s " ,
cf_name_ . c_str ( ) , file_num_buf , start_index ,
" is already being compacted. No size amp reduction possible. \n " ) ;
return nullptr ;
}
@ -708,58 +798,58 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
}
// size of earliest file
uint64_t earliest_file_size = sorted_runs . back ( ) . size ;
uint64_t earliest_file_size = sorted_runs_ . back ( ) . size ;
// size amplification = percentage of additional size
if ( candidate_size * 100 < ratio * earliest_file_size ) {
ROCKS_LOG_BUFFER (
log_buffer ,
log_buffer_ ,
" [%s] Universal: size amp not needed. newer-files-total-size % " PRIu64
" earliest-file-size % " PRIu64 ,
cf_name . c_str ( ) , candidate_size , earliest_file_size ) ;
cf_name_ . c_str ( ) , candidate_size , earliest_file_size ) ;
return nullptr ;
} else {
ROCKS_LOG_BUFFER (
log_buffer ,
log_buffer_ ,
" [%s] Universal: size amp needed. newer-files-total-size % " PRIu64
" earliest-file-size % " PRIu64 ,
cf_name . c_str ( ) , candidate_size , earliest_file_size ) ;
cf_name_ . c_str ( ) , candidate_size , earliest_file_size ) ;
}
assert ( start_index < sorted_runs . size ( ) - 1 ) ;
assert ( start_index < sorted_runs_ . size ( ) - 1 ) ;
// Estimate total file size
uint64_t estimated_total_size = 0 ;
for ( size_t loop = start_index ; loop < sorted_runs . size ( ) ; loop + + ) {
estimated_total_size + = sorted_runs [ loop ] . size ;
for ( size_t loop = start_index ; loop < sorted_runs_ . size ( ) ; loop + + ) {
estimated_total_size + = sorted_runs_ [ loop ] . size ;
}
uint32_t path_id =
GetPathId ( ioptions_ , mutable_cf_options , estimated_total_size ) ;
int start_level = sorted_runs [ start_index ] . level ;
GetPathId ( ioptions_ , mutable_cf_options_ , estimated_total_size ) ;
int start_level = sorted_runs_ [ start_index ] . level ;
std : : vector < CompactionInputFiles > inputs ( vstorage - > num_levels ( ) ) ;
std : : vector < CompactionInputFiles > inputs ( vstorage_ - > num_levels ( ) ) ;
for ( size_t i = 0 ; i < inputs . size ( ) ; + + i ) {
inputs [ i ] . level = start_level + static_cast < int > ( i ) ;
}
// We always compact all the files, so always compress.
for ( size_t loop = start_index ; loop < sorted_runs . size ( ) ; loop + + ) {
auto & picking_sr = sorted_runs [ loop ] ;
for ( size_t loop = start_index ; loop < sorted_runs_ . size ( ) ; loop + + ) {
auto & picking_sr = sorted_runs_ [ loop ] ;
if ( picking_sr . level = = 0 ) {
FileMetaData * f = picking_sr . file ;
inputs [ 0 ] . files . push_back ( f ) ;
} else {
auto & files = inputs [ picking_sr . level - start_level ] . files ;
for ( auto * f : vstorage - > LevelFiles ( picking_sr . level ) ) {
for ( auto * f : vstorage_ - > LevelFiles ( picking_sr . level ) ) {
files . push_back ( f ) ;
}
}
char file_num_buf [ 256 ] ;
picking_sr . DumpSizeInfo ( file_num_buf , sizeof ( file_num_buf ) , loop ) ;
ROCKS_LOG_BUFFER ( log_buffer , " [%s] Universal: size amp picking %s " ,
cf_name . c_str ( ) , file_num_buf ) ;
ROCKS_LOG_BUFFER ( log_buffer_ , " [%s] Universal: size amp picking %s " ,
cf_name_ . c_str ( ) , file_num_buf ) ;
}
// output files at the bottom most level, unless it's reserved
int output_level = vstorage - > num_levels ( ) - 1 ;
int output_level = vstorage_ - > num_levels ( ) - 1 ;
// last level is reserved for the files ingested behind
if ( ioptions_ . allow_ingest_behind ) {
assert ( output_level > 1 ) ;
@ -767,29 +857,27 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
}
return new Compaction (
vstorage , ioptions_ , mutable_cf_options , std : : move ( inputs ) , output_level ,
MaxFileSizeForLevel ( mutable_cf_options , output_level ,
vstorage_ , ioptions_ , mutable_cf_options_ , std : : move ( inputs ) ,
output_level ,
MaxFileSizeForLevel ( mutable_cf_options_ , output_level ,
kCompactionStyleUniversal ) ,
/* max_grandparent_overlap_bytes */ LLONG_MAX , path_id ,
GetCompressionType ( ioptions_ , vstorage , mutable_cf_options , output _level ,
1 ) ,
GetCompressionOptions ( ioptions_ , vstorage , output_level ) ,
GetCompressionType ( ioptions_ , vstorage_ , mutable_cf_options_ ,
output_level , 1 ) ,
GetCompressionOptions ( ioptions_ , vstorage_ , output_level ) ,
/* max_subcompactions */ 0 , /* grandparents */ { } , /* is manual */ false ,
score , false /* deletion_compaction */ ,
score_ , false /* deletion_compaction */ ,
CompactionReason : : kUniversalSizeAmplification ) ;
}
// Pick files marked for compaction. Typically, files are marked by
// CompactOnDeleteCollector due to the presence of tombstones.
Compaction * UniversalCompactionPicker : : PickDeleteTriggeredCompaction (
const std : : string & cf_name , const MutableCFOptions & mutable_cf_options ,
VersionStorageInfo * vstorage , double score ,
const std : : vector < SortedRun > & /*sorted_runs*/ , LogBuffer * /*log_buffer*/ ) {
Compaction * UniversalCompactionBuilder : : PickDeleteTriggeredCompaction ( ) {
CompactionInputFiles start_level_inputs ;
int output_level ;
std : : vector < CompactionInputFiles > inputs ;
if ( vstorage - > num_levels ( ) = = 1 ) {
if ( vstorage_ - > num_levels ( ) = = 1 ) {
// This is single level universal. Since we're basically trying to reclaim
// space by processing files marked for compaction due to high tombstone
// density, let's do the same thing as compaction to reduce size amp which
@ -799,7 +887,7 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
start_level_inputs . level = 0 ;
start_level_inputs . files . clear ( ) ;
output_level = 0 ;
for ( FileMetaData * f : vstorage - > LevelFiles ( 0 ) ) {
for ( FileMetaData * f : vstorage_ - > LevelFiles ( 0 ) ) {
if ( f - > marked_for_compaction ) {
compact = true ;
}
@ -818,24 +906,24 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
// For multi-level universal, the strategy is to make this look more like
// leveled. We pick one of the files marked for compaction and compact with
// overlapping files in the adjacent level.
PickFilesMarkedForCompaction ( cf_name , vstorage , & start_level , & outpu t_level,
& start_level_inputs ) ;
picker_ - > PickFilesMarkedForCompaction ( cf_name_ , vstorage_ , & star t_level,
& output_level , & start_level_inputs ) ;
if ( start_level_inputs . empty ( ) ) {
return nullptr ;
}
// Pick the first non-empty level after the start_level
for ( output_level = start_level + 1 ; output_level < vstorage - > num_levels ( ) ;
for ( output_level = start_level + 1 ; output_level < vstorage_ - > num_levels ( ) ;
output_level + + ) {
if ( vstorage - > NumLevelFiles ( output_level ) ! = 0 ) {
if ( vstorage_ - > NumLevelFiles ( output_level ) ! = 0 ) {
break ;
}
}
// If all higher levels are empty, pick the highest level as output level
if ( output_level = = vstorage - > num_levels ( ) ) {
if ( output_level = = vstorage_ - > num_levels ( ) ) {
if ( start_level = = 0 ) {
output_level = vstorage - > num_levels ( ) - 1 ;
output_level = vstorage_ - > num_levels ( ) - 1 ;
} else {
// If start level is non-zero and all higher levels are empty, this
// compaction will translate into a trivial move. Since the idea is
@ -845,15 +933,15 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
}
}
if ( ioptions_ . allow_ingest_behind & &
output_level = = vstorage - > num_levels ( ) - 1 ) {
output_level = = vstorage_ - > num_levels ( ) - 1 ) {
assert ( output_level > 1 ) ;
output_level - - ;
}
if ( output_level ! = 0 ) {
if ( start_level = = 0 ) {
if ( ! GetOverlappingL0Files ( vstorage , & start_level_inputs , output_level ,
nullptr ) ) {
if ( ! picker_ - > GetOverlappingL0Files ( vstorage_ , & start_level_inputs ,
output_level , nullptr ) ) {
return nullptr ;
}
}
@ -862,16 +950,16 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
int parent_index = - 1 ;
output_level_inputs . level = output_level ;
if ( ! SetupOtherInputs ( cf_name , mutable_cf_options , vstorage ,
& start_level_inputs , & output_level_inputs ,
& parent_index , - 1 ) ) {
if ( ! picker_ - > SetupOtherInputs ( cf_name_ , mutable_cf_options_ , vstorage_ ,
& start_level_inputs , & output_level_inputs ,
& parent_index , - 1 ) ) {
return nullptr ;
}
inputs . push_back ( start_level_inputs ) ;
if ( ! output_level_inputs . empty ( ) ) {
inputs . push_back ( output_level_inputs ) ;
}
if ( FilesRangeOverlapWithCompaction ( inputs , output_level ) ) {
if ( picker_ - > FilesRangeOverlapWithCompaction ( inputs , output_level ) ) {
return nullptr ;
}
} else {
@ -881,21 +969,22 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
uint64_t estimated_total_size = 0 ;
// Use size of the output level as estimated file size
for ( FileMetaData * f : vstorage - > LevelFiles ( output_level ) ) {
for ( FileMetaData * f : vstorage_ - > LevelFiles ( output_level ) ) {
estimated_total_size + = f - > fd . GetFileSize ( ) ;
}
uint32_t path_id =
GetPathId ( ioptions_ , mutable_cf_options , estimated_total_size ) ;
GetPathId ( ioptions_ , mutable_cf_options_ , estimated_total_size ) ;
return new Compaction (
vstorage , ioptions_ , mutable_cf_options , std : : move ( inputs ) , output_level ,
MaxFileSizeForLevel ( mutable_cf_options , output_level ,
vstorage_ , ioptions_ , mutable_cf_options_ , std : : move ( inputs ) ,
output_level ,
MaxFileSizeForLevel ( mutable_cf_options_ , output_level ,
kCompactionStyleUniversal ) ,
/* max_grandparent_overlap_bytes */ LLONG_MAX , path_id ,
GetCompressionType ( ioptions_ , vstorage , mutable_cf_options , output _level ,
1 ) ,
GetCompressionOptions ( ioptions_ , vstorage , output_level ) ,
GetCompressionType ( ioptions_ , vstorage_ , mutable_cf_options_ ,
output_level , 1 ) ,
GetCompressionOptions ( ioptions_ , vstorage_ , output_level ) ,
/* max_subcompactions */ 0 , /* grandparents */ { } , /* is manual */ true ,
score , false /* deletion_compaction */ ,
score_ , false /* deletion_compaction */ ,
CompactionReason : : kFilesMarkedForCompaction ) ;
}
} // namespace rocksdb