@ -12,6 +12,7 @@
# include <algorithm>
# include <cinttypes>
# include <memory>
# include <optional>
# include <set>
# include <utility>
# include <vector>
@ -30,6 +31,7 @@
# include "db/log_writer.h"
# include "db/merge_helper.h"
# include "db/range_del_aggregator.h"
# include "db/version_edit.h"
# include "db/version_set.h"
# include "file/filename.h"
# include "file/read_write_util.h"
@ -44,6 +46,7 @@
# include "port/port.h"
# include "rocksdb/db.h"
# include "rocksdb/env.h"
# include "rocksdb/options.h"
# include "rocksdb/statistics.h"
# include "rocksdb/status.h"
# include "rocksdb/table.h"
@ -232,24 +235,22 @@ void CompactionJob::Prepare() {
bottommost_level_ = c - > bottommost_level ( ) ;
if ( c - > ShouldFormSubcompactions ( ) ) {
{
StopWatch sw ( db_options_ . clock , stats_ , SUBCOMPACTION_SETUP_TIME ) ;
GenSubcompactionBoundaries ( ) ;
}
}
if ( boundaries_ . size ( ) > 1 ) {
for ( size_t i = 0 ; i < = boundaries_ . size ( ) ; i + + ) {
Slice * start = i = = 0 ? nullptr : & boundaries_ [ i - 1 ] ;
Slice * end = i = = boundaries_ . size ( ) ? nullptr : & boundaries_ [ i ] ;
compact_ - > sub_compact_states . emplace_back ( c , start , end ,
static_cast < uint32_t > ( i ) ) ;
compact_ - > sub_compact_states . emplace_back (
c , ( i ! = 0 ) ? std : : optional < Slice > ( boundaries_ [ i - 1 ] ) : std : : nullopt ,
( i ! = boundaries_ . size ( ) ) ? std : : optional < Slice > ( boundaries_ [ i ] )
: std : : nullopt ,
static_cast < uint32_t > ( i ) ) ;
}
RecordInHistogram ( stats_ , NUM_SUBCOMPACTIONS_SCHEDULED ,
compact_ - > sub_compact_states . size ( ) ) ;
} else {
constexpr Slice * start = nullptr ;
constexpr Slice * end = nullptr ;
compact_ - > sub_compact_states . emplace_back ( c , start , end , /*sub_job_id*/ 0 ) ;
compact_ - > sub_compact_states . emplace_back ( c , std : : nullopt , std : : nullopt ,
/*sub_job_id*/ 0 ) ;
}
if ( c - > immutable_options ( ) - > preclude_last_level_data_seconds > 0 ) {
@ -299,15 +300,48 @@ struct RangeWithSize {
} ;
void CompactionJob : : GenSubcompactionBoundaries ( ) {
// The goal is to find some boundary keys so that we can evenly partition
// the compaction input data into max_subcompactions ranges.
// For every input file, we ask TableReader to estimate 128 anchor points
// that evenly partition the input file into 128 ranges and the range
// sizes. This can be calculated by scanning index blocks of the file.
// Once we have the anchor points for all the input files, we merge them
// together and try to find keys dividing ranges evenly.
// For example, if we have two input files, and each returns following
// ranges:
// File1: (a1, 1000), (b1, 1200), (c1, 1100)
// File2: (a2, 1100), (b2, 1000), (c2, 1000)
// We total sort the keys to following:
// (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
// We calculate the total size by adding up all ranges' size, which is 6400.
// If we would like to partition into 2 subcompactions, the target of the
// range size is 3200. Based on the size, we take "b1" as the partition key
// since the first three ranges would hit 3200.
//
// Note that the ranges are actually overlapping. For example, in the example
// above, the range ending with "b1" is overlapping with the range ending with
// "b2". So the size 1000+1100+1200 is an underestimation of data size up to
// "b1". In extreme cases where we only compact N L0 files, a range can
// overlap with N-1 other ranges. Since we requested a relatively large number
// (128) of ranges from each input files, even N range overlapping would
// cause relatively small inaccuracy.
auto * c = compact_ - > compaction ;
if ( c - > max_subcompactions ( ) < = 1 ) {
return ;
}
auto * cfd = c - > column_family_data ( ) ;
const Comparator * cfd_comparator = cfd - > user_comparator ( ) ;
std : : vector < Slice > bounds ;
const InternalKeyComparator & icomp = cfd - > internal_comparator ( ) ;
auto * v = compact_ - > compaction - > input_version ( ) ;
int base_level = v - > storage_info ( ) - > base_level ( ) ;
InstrumentedMutexUnlock unlock_guard ( db_mutex_ ) ;
uint64_t total_size = 0 ;
std : : vector < TableReader : : Anchor > all_anchors ;
int start_lvl = c - > start_level ( ) ;
int out_lvl = c - > output_level ( ) ;
// Add the starting and/or ending key of certain input files as a potential
// boundary
for ( size_t lvl_idx = 0 ; lvl_idx < c - > num_input_levels ( ) ; lvl_idx + + ) {
int lvl = c - > level ( lvl_idx ) ;
if ( lvl > = start_lvl & & lvl < = out_lvl ) {
@ -318,106 +352,57 @@ void CompactionJob::GenSubcompactionBoundaries() {
continue ;
}
if ( lvl = = 0 ) {
// For level 0 add the starting and ending key of each file since the
// files may have greatly differing key ranges (not range-partitioned)
for ( size_t i = 0 ; i < num_files ; i + + ) {
bounds . emplace_back ( flevel - > files [ i ] . smallest_key ) ;
bounds . emplace_back ( flevel - > files [ i ] . largest_key ) ;
for ( size_t i = 0 ; i < num_files ; i + + ) {
FileMetaData * f = flevel - > files [ i ] . file_metadata ;
std : : vector < TableReader : : Anchor > my_anchors ;
Status s = cfd - > table_cache ( ) - > ApproximateKeyAnchors (
ReadOptions ( ) , icomp , f - > fd , my_anchors ) ;
if ( ! s . ok ( ) | | my_anchors . empty ( ) ) {
my_anchors . emplace_back ( f - > largest . user_key ( ) , f - > fd . GetFileSize ( ) ) ;
}
} else {
// For all other levels add the smallest/largest key in the level to
// encompass the range covered by that level
bounds . emplace_back ( flevel - > files [ 0 ] . smallest_key ) ;
bounds . emplace_back ( flevel - > files [ num_files - 1 ] . largest_key ) ;
if ( lvl = = out_lvl ) {
// For the last level include the starting keys of all files since
// the last level is the largest and probably has the widest key
// range. Since it's range partitioned, the ending key of one file
// and the starting key of the next are very close (or identical).
for ( size_t i = 1 ; i < num_files ; i + + ) {
bounds . emplace_back ( flevel - > files [ i ] . smallest_key ) ;
}
for ( auto & ac : my_anchors ) {
// Can be optimize to avoid this loop.
total_size + = ac . range_size ;
}
}
}
}
std : : sort ( bounds . begin ( ) , bounds . end ( ) ,
[ cfd_comparator ] ( const Slice & a , const Slice & b ) - > bool {
return cfd_comparator - > Compare ( ExtractUserKey ( a ) ,
ExtractUserKey ( b ) ) < 0 ;
} ) ;
// Remove duplicated entries from bounds
bounds . erase (
std : : unique ( bounds . begin ( ) , bounds . end ( ) ,
[ cfd_comparator ] ( const Slice & a , const Slice & b ) - > bool {
return cfd_comparator - > Compare ( ExtractUserKey ( a ) ,
ExtractUserKey ( b ) ) = = 0 ;
} ) ,
bounds . end ( ) ) ;
// Combine consecutive pairs of boundaries into ranges with an approximate
// size of data covered by keys in that range
uint64_t sum = 0 ;
std : : vector < RangeWithSize > ranges ;
// Get input version from CompactionState since it's already referenced
// earlier in SetInputVersioCompaction::SetInputVersion and will not change
// when db_mutex_ is released below
auto * v = compact_ - > compaction - > input_version ( ) ;
for ( auto it = bounds . begin ( ) ; ; ) {
const Slice a = * it ;
+ + it ;
if ( it = = bounds . end ( ) ) {
break ;
all_anchors . insert ( all_anchors . end ( ) , my_anchors . begin ( ) ,
my_anchors . end ( ) ) ;
}
}
const Slice b = * it ;
// ApproximateSize could potentially create table reader iterator to seek
// to the index block and may incur I/O cost in the process. Unlock db
// mutex to reduce contention
db_mutex_ - > Unlock ( ) ;
uint64_t size = versions_ - > ApproximateSize ( SizeApproximationOptions ( ) , v , a ,
b , start_lvl , out_lvl + 1 ,
TableReaderCaller : : kCompaction ) ;
db_mutex_ - > Lock ( ) ;
ranges . emplace_back ( a , b , size ) ;
sum + = size ;
}
// Here we total sort all the anchor points across all files and go through
// them in the sorted order to find partitioning boundaries.
// Not the most efficient implementation. A much more efficient algorithm
// probably exists. But they are more complex. If performance turns out to
// be a problem, we can optimize.
std : : sort (
all_anchors . begin ( ) , all_anchors . end ( ) ,
[ cfd_comparator ] ( TableReader : : Anchor & a , TableReader : : Anchor & b ) - > bool {
return cfd_comparator - > Compare ( a . user_key , b . user_key ) < 0 ;
} ) ;
// Group the ranges into subcompactions
const double min_file_fill_percent = 4.0 / 5 ;
int base_level = v - > storage_info ( ) - > base_level ( ) ;
uint64_t max_output_files = static_cast < uint64_t > ( std : : ceil (
sum / min_file_fill_percent /
uint64_t target_range_size = std : : max (
total_size / static_cast < uint64_t > ( c - > max_subcompactions ( ) ) ,
MaxFileSizeForLevel (
* ( c - > mutable_cf_options ( ) ) , out_lvl ,
c - > immutable_options ( ) - > compaction_style , base_level ,
c - > immutable_options ( ) - > level_compaction_dynamic_level_bytes ) ) ) ;
uint64_t subcompactions =
std : : min ( { static_cast < uint64_t > ( ranges . size ( ) ) ,
static_cast < uint64_t > ( c - > max_subcompactions ( ) ) ,
max_output_files } ) ;
if ( subcompactions > 1 ) {
double mean = sum * 1.0 / subcompactions ;
// Greedily add ranges to the subcompaction until the sum of the ranges'
// sizes becomes >= the expected mean size of a subcompaction
sum = 0 ;
for ( size_t i = 0 ; i + 1 < ranges . size ( ) ; i + + ) {
sum + = ranges [ i ] . size ;
if ( subcompactions = = 1 ) {
// If there's only one left to schedule then it goes to the end so no
// need to put an end boundary
continue ;
}
if ( sum > = mean ) {
boundaries_ . emplace_back ( ExtractUserKey ( ranges [ i ] . range . limit ) ) ;
subcompactions - - ;
sum = 0 ;
}
c - > immutable_options ( ) - > level_compaction_dynamic_level_bytes ) ) ;
if ( target_range_size > = total_size ) {
return ;
}
uint64_t next_threshold = target_range_size ;
uint64_t cumulative_size = 0 ;
for ( TableReader : : Anchor & anchor : all_anchors ) {
cumulative_size + = anchor . range_size ;
if ( cumulative_size > next_threshold ) {
next_threshold + = target_range_size ;
boundaries_ . push_back ( anchor . user_key ) ;
}
if ( boundaries_ . size ( ) + 1 > = uint64_t { c - > max_subcompactions ( ) } ) {
break ;
}
}
}
@ -885,8 +870,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// TODO: since we already use C++17, should use
// std::optional<const Slice> instead.
const Slice * const start = sub_compact - > start ;
const Slice * const end = sub_compact - > end ;
const std : : optional < Slice > start = sub_compact - > start ;
const std : : optional < Slice > end = sub_compact - > end ;
ReadOptions read_options ;
read_options . verify_checksums = true ;
@ -900,19 +885,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// Note: if we're going to support subcompactions for user-defined timestamps,
// the timestamp part will have to be stripped from the bounds here.
assert ( ( ! start & & ! end ) | | cfd - > user_comparator ( ) - > timestamp_size ( ) = = 0 ) ;
read_options . iterate_lower_bound = start ;
read_options . iterate_upper_bound = end ;
assert ( ( ! start . has_value ( ) & & ! end . has_value ( ) ) | |
cfd - > user_comparator ( ) - > timestamp_size ( ) = = 0 ) ;
if ( start . has_value ( ) ) {
read_options . iterate_lower_bound = & start . value ( ) ;
}
if ( end . has_value ( ) ) {
read_options . iterate_upper_bound = & end . value ( ) ;
}
// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
std : : unique_ptr < InternalIterator > raw_input ( versions_ - > MakeInputIterator (
read_options , sub_compact - > compaction , range_del_agg . get ( ) ,
file_options_for_read_ ,
( start = = nullptr ) ? std : : optional < const Slice > { }
: std : : optional < const Slice > { * start } ,
( end = = nullptr ) ? std : : optional < const Slice > { }
: std : : optional < const Slice > { * end } ) ) ;
file_options_for_read_ , start , end ) ) ;
InternalIterator * input = raw_input . get ( ) ;
IterKey start_ikey ;
@ -920,20 +906,21 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
Slice start_slice ;
Slice end_slice ;
if ( start ) {
start_ikey . SetInternalKey ( * start , kMaxSequenceNumber , kValueTypeForSeek ) ;
if ( start . has_value ( ) ) {
start_ikey . SetInternalKey ( start . value ( ) , kMaxSequenceNumber ,
kValueTypeForSeek ) ;
start_slice = start_ikey . GetInternalKey ( ) ;
}
if ( end ) {
end_ikey . SetInternalKey ( * end , kMaxSequenceNumber , kValueTypeForSeek ) ;
if ( end . has_value ( ) ) {
end_ikey . SetInternalKey ( end . value ( ) , kMaxSequenceNumber , kValueTypeForSeek ) ;
end_slice = end_ikey . GetInternalKey ( ) ;
}
std : : unique_ptr < InternalIterator > clip ;
if ( start | | end ) {
if ( start . has_value ( ) | | end . has_value ( ) ) {
clip = std : : make_unique < ClippingIterator > (
raw_input . get ( ) , start ? & start_slice : nullptr ,
end ? & end_slice : nullptr , & cfd - > internal_comparator ( ) ) ;
raw_input . get ( ) , start . has_value ( ) ? & start_slice : nullptr ,
end . has_value ( ) ? & end_slice : nullptr , & cfd - > internal_comparator ( ) ) ;
input = clip . get ( ) ;
}
@ -1061,8 +1048,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
assert ( ! end | |
cfd - > user_comparator ( ) - > Compare ( c_iter - > user_key ( ) , * end ) < 0 ) ;
assert ( ! end . has_value ( ) | | cfd - > user_comparator ( ) - > Compare (
c_iter - > user_key ( ) , end . value ( ) ) < 0 ) ;
if ( c_iter_stats . num_input_records % kRecordStatsEvery = =
kRecordStatsEvery - 1 ) {
@ -1280,10 +1267,12 @@ Status CompactionJob::FinishCompactionOutputFile(
// output_to_penultimate_level compaction here, as it's only used to decide
// if range dels could be dropped.
if ( outputs . HasRangeDel ( ) ) {
s = outputs . AddRangeDels ( sub_compact - > start , sub_compact - > end ,
range_del_out_stats , bottommost_level_ ,
cfd - > internal_comparator ( ) , earliest_snapshot ,
next_table_min_key ) ;
s = outputs . AddRangeDels (
sub_compact - > start . has_value ( ) ? & ( sub_compact - > start . value ( ) )
: nullptr ,
sub_compact - > end . has_value ( ) ? & ( sub_compact - > end . value ( ) ) : nullptr ,
range_del_out_stats , bottommost_level_ , cfd - > internal_comparator ( ) ,
earliest_snapshot , next_table_min_key ) ;
}
RecordDroppedKeys ( range_del_out_stats , & sub_compact - > compaction_job_stats ) ;
TEST_SYNC_POINT ( " CompactionJob::FinishCompactionOutputFile1 " ) ;
@ -1595,16 +1584,16 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
}
uint64_t current_time = static_cast < uint64_t > ( temp_current_time ) ;
InternalKey tmp_start , tmp_end ;
if ( sub_compact - > start ! = nullptr ) {
tmp_start . SetMinPossibleForUserKey ( * ( sub_compact - > start ) ) ;
if ( sub_compact - > start . has_value ( ) ) {
tmp_start . SetMinPossibleForUserKey ( sub_compact - > start . value ( ) ) ;
}
if ( sub_compact - > end ! = nullptr ) {
tmp_end . SetMinPossibleForUserKey ( * ( sub_compact - > end ) ) ;
if ( sub_compact - > end . has_value ( ) ) {
tmp_end . SetMinPossibleForUserKey ( sub_compact - > end . value ( ) ) ;
}
uint64_t oldest_ancester_time =
sub_compact - > compaction - > MinInputFileOldestAncesterTime (
( sub_compact - > start ! = nullptr ) ? & tmp_start : nullptr ,
( sub_compact - > end ! = nullptr ) ? & tmp_end : nullptr ) ;
sub_compact - > start . has_value ( ) ? & tmp_start : nullptr ,
sub_compact - > end . has_value ( ) ? & tmp_end : nullptr ) ;
if ( oldest_ancester_time = = std : : numeric_limits < uint64_t > : : max ( ) ) {
oldest_ancester_time = current_time ;
}