@ -15,9 +15,13 @@
# include <inttypes.h>
# include <algorithm>
# include <functional>
# include <vector>
# include <memory>
# include <list>
# include <set>
# include <thread>
# include <utility>
# include "db/builder.h"
# include "db/db_iter.h"
@ -58,7 +62,7 @@
namespace rocksdb {
// Maintains state for each sub-compaction
struct CompactionJob : : SubC ompactionState {
struct CompactionJob : : Subc ompactionState {
Compaction * compaction ;
// The boundaries of the key-range this compaction is interested in. No two
@ -66,10 +70,10 @@ struct CompactionJob::SubCompactionState {
// 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
Slice * start , * end ;
// The return status of this compaction
// The return status of this sub compaction
Status status ;
// Files produced by compaction
// Files produced by this sub compaction
struct Output {
uint64_t number ;
uint32_t path_id ;
@ -88,7 +92,7 @@ struct CompactionJob::SubCompactionState {
// This subcompaction's ouptut could be empty if compaction was aborted
// before this subcompaction had a chance to generate any output files.
// When subcompactions are executed sequentially this is more likely and
// will be particulalry likely for the las t subcompaction to be empty.
// will be particulalry likely for the later subcompactions to be empty.
// Once they are run in parallel however it should be much rarer.
return nullptr ;
} else {
@ -96,11 +100,12 @@ struct CompactionJob::SubCompactionState {
}
}
// State during the sub- compaction
// State during the subcompaction
uint64_t total_bytes ;
uint64_t num_input_records ;
uint64_t num_output_records ;
CompactionJobStats compaction_job_stats ;
uint64_t approx_size ;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
@ -110,7 +115,8 @@ struct CompactionJob::SubCompactionState {
// is in or beyond the last file checked during the previous call
std : : vector < size_t > level_ptrs ;
SubCompactionState ( Compaction * c , Slice * _start , Slice * _end )
SubcompactionState ( Compaction * c , Slice * _start , Slice * _end ,
uint64_t size = 0 )
: compaction ( c ) ,
start ( _start ) ,
end ( _end ) ,
@ -118,16 +124,15 @@ struct CompactionJob::SubCompactionState {
builder ( nullptr ) ,
total_bytes ( 0 ) ,
num_input_records ( 0 ) ,
num_output_records ( 0 ) {
num_output_records ( 0 ) ,
approx_size ( size ) {
assert ( compaction ! = nullptr ) ;
level_ptrs = std : : vector < size_t > ( compaction - > number_levels ( ) , 0 ) ;
}
SubCompactionState ( SubCompactionState & & o ) {
* this = std : : move ( o ) ;
}
SubcompactionState ( SubcompactionState & & o ) { * this = std : : move ( o ) ; }
SubC ompactionState & operator = ( SubC ompactionState & & o ) {
SubcompactionState & operator = ( SubcompactionState & & o ) {
compaction = std : : move ( o . compaction ) ;
start = std : : move ( o . start ) ;
end = std : : move ( o . end ) ;
@ -138,14 +143,16 @@ struct CompactionJob::SubCompactionState {
total_bytes = std : : move ( o . total_bytes ) ;
num_input_records = std : : move ( o . num_input_records ) ;
num_output_records = std : : move ( o . num_output_records ) ;
compaction_job_stats = std : : move ( o . compaction_job_stats ) ;
approx_size = std : : move ( o . approx_size ) ;
level_ptrs = std : : move ( o . level_ptrs ) ;
return * this ;
}
// Because member unique_ptrs do not have these.
SubCompactionState ( const SubC ompactionState & ) = delete ;
SubcompactionState ( const Subc ompactionState & ) = delete ;
SubC ompactionState & operator = ( const SubC ompactionState & ) = delete ;
Subc ompactionState & operator = ( const Subc ompactionState & ) = delete ;
} ;
// Maintains state for the entire compaction
@ -154,7 +161,7 @@ struct CompactionJob::CompactionState {
// REQUIRED: subcompaction states are stored in order of increasing
// key-range
std : : vector < CompactionJob : : SubC ompactionState > sub_compact_states ;
std : : vector < CompactionJob : : Subc ompactionState > sub_compact_states ;
Status status ;
uint64_t total_bytes ;
@ -176,13 +183,11 @@ struct CompactionJob::CompactionState {
}
Slice SmallestUserKey ( ) {
for ( size_t i = 0 ; i < sub_compact_states . size ( ) ; i + + ) {
if ( ! sub_compact_states [ i ] . outputs . empty ( ) ) {
return sub_compact_states [ i ] . outputs [ 0 ] . smallest . user_key ( ) ;
for ( auto & s : sub_compact_states ) {
if ( ! s . outputs . empty ( ) ) {
return s . outputs [ 0 ] . smallest . user_key ( ) ;
}
}
// TODO(aekmekji): should we exit with an error if it reaches here?
assert ( 0 ) ;
return Slice ( nullptr , 0 ) ;
}
@ -193,19 +198,18 @@ struct CompactionJob::CompactionState {
return sub_compact_states [ i ] . current_output ( ) - > largest . user_key ( ) ;
}
}
// TODO(aekmekji): should we exit with an error if it reaches here?
assert ( 0 ) ;
return Slice ( nullptr , 0 ) ;
}
} ;
void CompactionJob : : AggregateStatistics ( ) {
for ( SubC ompactionState & sc : compact_ - > sub_compact_states ) {
for ( Subc ompactionState & sc : compact_ - > sub_compact_states ) {
compact_ - > total_bytes + = sc . total_bytes ;
compact_ - > num_input_records + = sc . num_input_records ;
compact_ - > num_output_records + = sc . num_output_records ;
if ( compaction_job_stats_ ) {
}
if ( compaction_job_stats_ ) {
for ( SubcompactionState & sc : compact_ - > sub_compact_states ) {
compaction_job_stats_ - > Add ( sc . compaction_job_stats ) ;
}
}
@ -325,78 +329,141 @@ void CompactionJob::Prepare() {
earliest_snapshot_ = existing_snapshots_ [ 0 ] ;
}
InitializeSubCompactions ( ) ;
if ( c - > ShouldFormSubcompactions ( ) ) {
const uint64_t start_micros = env_ - > NowMicros ( ) ;
GenSubcompactionBoundaries ( ) ;
MeasureTime ( stats_ , SUBCOMPACTION_SETUP_TIME ,
env_ - > NowMicros ( ) - start_micros ) ;
assert ( sizes_ . size ( ) = = boundaries_ . size ( ) + 1 ) ;
for ( size_t i = 0 ; i < = boundaries_ . size ( ) ; i + + ) {
Slice * start = i = = 0 ? nullptr : & boundaries_ [ i - 1 ] ;
Slice * end = i = = boundaries_ . size ( ) ? nullptr : & boundaries_ [ i ] ;
compact_ - > sub_compact_states . emplace_back ( c , start , end , sizes_ [ i ] ) ;
}
} else {
compact_ - > sub_compact_states . emplace_back ( c , nullptr , nullptr ) ;
}
}
// For L0-L1 compaction, iterators work in parallel by processing
// different subsets of the full key range. This function sets up
// the local states used by each of these subcompactions during
// their execution
void CompactionJob : : InitializeSubCompactions ( ) {
Compaction * c = compact_ - > compaction ;
auto & bounds = sub_compaction_boundaries_ ;
if ( c - > IsSubCompaction ( ) ) {
auto * cmp = c - > column_family_data ( ) - > user_comparator ( ) ;
for ( size_t which = 0 ; which < c - > num_input_levels ( ) ; which + + ) {
if ( c - > level ( which ) = = 1 ) {
const LevelFilesBrief * flevel = c - > input_levels ( which ) ;
size_t num_files = flevel - > num_files ;
if ( num_files > 1 ) {
std : : vector < Slice > candidates ;
auto & files = flevel - > files ;
Slice global_min = ExtractUserKey ( files [ 0 ] . smallest_key ) ;
Slice global_max = ExtractUserKey ( files [ num_files - 1 ] . largest_key ) ;
struct RangeWithSize {
Range range ;
uint64_t size ;
for ( size_t i = 1 ; i < num_files ; i + + ) {
// Make sure the smallest key in two consecutive L1 files are
// unique before adding the smallest key as a boundary. Also ensure
// that the boundary won't lead to an empty subcompaction (happens
// if the boundary == the smallest or largest key)
Slice s1 = ExtractUserKey ( files [ i ] . smallest_key ) ;
Slice s2 = i = = num_files - 1
? Slice ( )
: ExtractUserKey ( files [ i + 1 ] . smallest_key ) ;
if ( ( i = = num_files - 1 & & cmp - > Compare ( s1 , global_max ) < 0 )
| | ( i < num_files - 1 & & cmp - > Compare ( s1 , s2 ) < 0 & &
cmp - > Compare ( s1 , global_min ) > 0 ) ) {
candidates . emplace_back ( s1 ) ;
}
}
RangeWithSize ( const Slice & a , const Slice & b , uint64_t s = 0 )
: range ( a , b ) , size ( s ) { }
} ;
bool SliceCompare ( const Comparator * cmp , const Slice & a , const Slice & b ) {
// Returns true if a < b
return cmp - > Compare ( ExtractUserKey ( a ) , ExtractUserKey ( b ) ) < 0 ;
}
// Generates a histogram representing potential divisions of key ranges from
// the input. It adds the starting and/or ending keys of certain input files
// to the working set and then finds the approximate size of data in between
// each consecutive pair of slices. Then it divides these ranges into
// consecutive groups such that each group has a similar size.
void CompactionJob : : GenSubcompactionBoundaries ( ) {
auto * c = compact_ - > compaction ;
auto * cfd = c - > column_family_data ( ) ;
std : : set < Slice , std : : function < bool ( const Slice & a , const Slice & b ) > > bounds (
std : : bind ( & SliceCompare , cfd - > user_comparator ( ) , std : : placeholders : : _1 ,
std : : placeholders : : _2 ) ) ;
int start_lvl = c - > start_level ( ) ;
int out_lvl = c - > output_level ( ) ;
// Add the starting and/or ending key of certain input files as a potential
// boundary (because we're inserting into a set, it avoids duplicates)
for ( size_t lvl_idx = 0 ; lvl_idx < c - > num_input_levels ( ) ; lvl_idx + + ) {
int lvl = c - > level ( lvl_idx ) ;
if ( lvl > = start_lvl & & lvl < = out_lvl ) {
const LevelFilesBrief * flevel = c - > input_levels ( lvl_idx ) ;
size_t num_files = flevel - > num_files ;
if ( num_files = = 0 ) {
break ;
}
// Divide the potential L1 file boundaries (those that passed the
// checks above) into 'max_subcompactions' groups such that each have
// as close to an equal number of files in it as possible
// TODO(aekmekji): refine this later to depend on file size
size_t files_left = candidates . size ( ) ;
size_t subcompactions_left =
static_cast < size_t > ( db_options_ . max_subcompactions ) < files_left
? db_options_ . max_subcompactions
: files_left ;
size_t num_to_include ;
size_t index = 0 ;
while ( files_left > 1 & & subcompactions_left > 1 ) {
num_to_include = files_left / subcompactions_left ;
index + = num_to_include ;
sub_compaction_boundaries_ . emplace_back ( candidates [ index ] ) ;
files_left - = num_to_include ;
subcompactions_left - - ;
if ( lvl = = 0 ) {
// For level 0 add the starting and ending key of each file since th e
// files may have greatly differing key ranges (not range-partitioned)
for ( size_t i = 0 ; i < num_files ; i + + ) {
bounds . emplace ( flevel - > files [ i ] . smallest_key ) ;
bounds . emplace ( flevel - > files [ i ] . largest_key ) ;
}
} else {
// For all other levels add the smallest/largest key in the level to
// encompass the range covered by that level
bounds . emplace ( flevel - > files [ 0 ] . smallest_key ) ;
bounds . emplace ( flevel - > files [ num_files - 1 ] . largest_key ) ;
if ( lvl = = out_lvl ) {
// For the last level include the starting keys of all files since
// the last level is the largest and probably has the widest key
// range. Since it's range partitioned, the ending key of one file
// and the starting key of the next are very close (or identical).
for ( size_t i = 1 ; i < num_files ; i + + ) {
bounds . emplace ( flevel - > files [ i ] . smallest_key ) ;
}
}
break ;
}
}
}
// Note: it's necessary for the first iterator sub-range to have
// start == nullptr and for the last to have end == nullptr
for ( size_t i = 0 ; i < = bounds . size ( ) ; i + + ) {
Slice * start = i = = 0 ? nullptr : & bounds [ i - 1 ] ;
Slice * end = i = = bounds . size ( ) ? nullptr : & bounds [ i ] ;
compact_ - > sub_compact_states . emplace_back ( compact_ - > compaction , start , end ) ;
// Combine consecutive pairs of boundaries into ranges with an approximate
// size of data covered by keys in that range
uint64_t sum = 0 ;
std : : vector < RangeWithSize > ranges ;
auto * v = cfd - > current ( ) ;
for ( auto it = bounds . begin ( ) ; ; ) {
const Slice a = * it ;
it + + ;
if ( it = = bounds . end ( ) ) {
break ;
}
const Slice b = * it ;
uint64_t size = versions_ - > ApproximateSize ( v , a , b , start_lvl , out_lvl + 1 ) ;
ranges . emplace_back ( a , b , size ) ;
sum + = size ;
}
// Group the ranges into subcompactions
const double min_file_fill_percent = 4.0 / 5 ;
uint64_t max_output_files = std : : ceil (
sum / min_file_fill_percent /
cfd - > GetCurrentMutableCFOptions ( ) - > MaxFileSizeForLevel ( out_lvl ) ) ;
uint64_t subcompactions =
std : : min ( { static_cast < uint64_t > ( ranges . size ( ) ) ,
static_cast < uint64_t > ( db_options_ . max_subcompactions ) ,
max_output_files } ) ;
double mean = sum * 1.0 / subcompactions ;
if ( subcompactions > 1 ) {
// Greedily add ranges to the subcompaction until the sum of the ranges'
// sizes becomes >= the expected mean size of a subcompaction
sum = 0 ;
for ( size_t i = 0 ; i < ranges . size ( ) - 1 ; i + + ) {
if ( subcompactions = = 1 ) {
// If there's only one left to schedule then it goes to the end so no
// need to put an end boundary
break ;
}
sum + = ranges [ i ] . size ;
if ( sum > = mean ) {
boundaries_ . emplace_back ( ExtractUserKey ( ranges [ i ] . range . limit ) ) ;
sizes_ . emplace_back ( sum ) ;
subcompactions - - ;
sum = 0 ;
}
}
sizes_ . emplace_back ( sum + ranges . back ( ) . size ) ;
} else {
// Only one range so its size is the total sum of sizes computed above
sizes_ . emplace_back ( sum ) ;
}
}
@ -407,15 +474,35 @@ Status CompactionJob::Run() {
log_buffer_ - > FlushBufferToLog ( ) ;
LogCompaction ( ) ;
// Run each subcompaction sequentially
const size_t num_threads = compact_ - > sub_compact_states . size ( ) ;
assert ( num_threads > 0 ) ;
const uint64_t start_micros = env_ - > NowMicros ( ) ;
for ( size_t i = 0 ; i < compact_ - > sub_compact_states . size ( ) ; i + + ) {
ProcessKeyValueCompaction ( & compact_ - > sub_compact_states [ i ] ) ;
// Launch a thread for each of subcompactions 1...num_threads-1
std : : vector < std : : thread > thread_pool ;
thread_pool . reserve ( num_threads - 1 ) ;
for ( size_t i = 1 ; i < compact_ - > sub_compact_states . size ( ) ; i + + ) {
thread_pool . emplace_back ( & CompactionJob : : ProcessKeyValueCompaction , this ,
& compact_ - > sub_compact_states [ i ] ) ;
}
// Always schedule the first subcompaction (whether or not there are also
// others) in the current thread to be efficient with resources
ProcessKeyValueCompaction ( & compact_ - > sub_compact_states [ 0 ] ) ;
// Wait for all other threads (if there are any) to finish execution
for ( auto & thread : thread_pool ) {
thread . join ( ) ;
}
if ( output_directory_ & & ! db_options_ . disableDataSync ) {
output_directory_ - > Fsync ( ) ;
}
compaction_stats_ . micros = env_ - > NowMicros ( ) - start_micros ;
MeasureTime ( stats_ , COMPACTION_TIME , compaction_stats_ . micros ) ;
// Determine if any of the subcompactions failed
// Check if any thread encountered an error during execution
Status status ;
for ( const auto & state : compact_ - > sub_compact_states ) {
if ( ! state . status . ok ( ) ) {
@ -485,7 +572,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
< < " num_output_files " < < compact_ - > NumOutputFiles ( )
< < " total_output_size " < < compact_ - > total_bytes
< < " num_input_records " < < compact_ - > num_input_records
< < " num_output_records " < < compact_ - > num_output_records ;
< < " num_output_records " < < compact_ - > num_output_records
< < " num_subcompactions " < < compact_ - > sub_compact_states . size ( ) ;
if ( measure_io_stats_ & & compaction_job_stats_ ! = nullptr ) {
stream < < " file_write_nanos " < < compaction_job_stats_ - > file_write_nanos ;
@ -507,7 +595,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
return status ;
}
void CompactionJob : : ProcessKeyValueCompaction ( SubC ompactionState * sub_compact ) {
void CompactionJob : : ProcessKeyValueCompaction ( Subc ompactionState * sub_compact ) {
assert ( sub_compact ! = nullptr ) ;
std : : unique_ptr < Iterator > input_ptr (
versions_ - > MakeInputIterator ( sub_compact - > compaction ) ) ;
@ -764,10 +852,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) {
if ( status . ok ( ) ) {
status = input - > status ( ) ;
}
if ( output_directory_ & & ! db_options_ . disableDataSync ) {
// TODO(aekmekji): Maybe only call once after all subcompactions complete?
output_directory_ - > Fsync ( ) ;
}
if ( measure_io_stats_ ) {
sub_compact - > compaction_job_stats . file_write_nanos + =
@ -788,9 +872,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) {
}
Status CompactionJob : : WriteKeyValue ( const Slice & key , const Slice & value ,
const ParsedInternalKey & ikey , const Status & input_status ,
SubCompactionState * sub_compact ) {
const ParsedInternalKey & ikey ,
const Status & input_status ,
SubcompactionState * sub_compact ) {
Slice newkey ( key . data ( ) , key . size ( ) ) ;
std : : string kstr ;
@ -833,6 +917,10 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value,
std : : max ( sub_compact - > current_output ( ) - > largest_seqno , seqno ) ;
// Close output file if it is big enough
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB)
Status status ;
if ( sub_compact - > builder - > FileSize ( ) > =
sub_compact - > compaction - > max_output_file_size ( ) ) {
@ -867,8 +955,8 @@ void CompactionJob::RecordDroppedKeys(
}
}
Status CompactionJob : : FinishCompactionOutputFile ( const Status & input_status ,
SubC ompactionState * sub_compact ) {
Status CompactionJob : : FinishCompactionOutputFile (
const Status & input_status , Subc ompactionState* sub_compact ) {
AutoThreadOperationStageUpdater stage_updater (
ThreadStatus : : STAGE_COMPACTION_SYNC_FILE ) ;
assert ( sub_compact ! = nullptr ) ;
@ -975,9 +1063,9 @@ Status CompactionJob::InstallCompactionResults(
// Add compaction outputs
compaction - > AddInputDeletions ( compact_ - > compaction - > edit ( ) ) ;
for ( SubC ompactionState & sub_compact : compact_ - > sub_compact_states ) {
for ( Subc ompactionState & sub_compact : compact_ - > sub_compact_states ) {
for ( size_t i = 0 ; i < sub_compact . outputs . size ( ) ; i + + ) {
const SubC ompactionState : : Output & out = sub_compact . outputs [ i ] ;
const Subc ompactionState : : Output & out = sub_compact . outputs [ i ] ;
compaction - > edit ( ) - > AddFile ( compaction - > output_level ( ) , out . number ,
out . path_id , out . file_size , out . smallest ,
out . largest , out . smallest_seqno ,
@ -1028,8 +1116,8 @@ void CompactionJob::RecordCompactionIOStats() {
IOSTATS_RESET ( bytes_written ) ;
}
Status CompactionJob : : OpenCompactionOutputFile ( SubCompactionState *
sub_compact ) {
Status CompactionJob : : OpenCompactionOutputFile (
SubcompactionState * sub_compact ) {
assert ( sub_compact ! = nullptr ) ;
assert ( sub_compact - > builder = = nullptr ) ;
// no need to lock because VersionSet::next_file_number_ is atomic
@ -1048,7 +1136,7 @@ Status CompactionJob::OpenCompactionOutputFile(SubCompactionState*
LogFlush ( db_options_ . info_log ) ;
return s ;
}
SubC ompactionState : : Output out ;
Subc ompactionState : : Output out ;
out . number = file_number ;
out . path_id = sub_compact - > compaction - > output_path_id ( ) ;
out . smallest . Clear ( ) ;
@ -1083,7 +1171,7 @@ Status CompactionJob::OpenCompactionOutputFile(SubCompactionState*
}
void CompactionJob : : CleanupCompaction ( ) {
for ( SubC ompactionState & sub_compact : compact_ - > sub_compact_states ) {
for ( Subc ompactionState & sub_compact : compact_ - > sub_compact_states ) {
const auto & sub_status = sub_compact . status ;
if ( sub_compact . builder ! = nullptr ) {
@ -1094,7 +1182,7 @@ void CompactionJob::CleanupCompaction() {
assert ( ! sub_status . ok ( ) | | sub_compact . outfile = = nullptr ) ;
}
for ( size_t i = 0 ; i < sub_compact . outputs . size ( ) ; i + + ) {
const SubC ompactionState : : Output & out = sub_compact . outputs [ i ] ;
const Subc ompactionState : : Output & out = sub_compact . outputs [ i ] ;
// If this file was inserted into the table cache then remove
// them here because this compaction was not committed.