@ -100,9 +100,6 @@ struct CompactionJob::SubCompactionState {
uint64_t total_bytes ;
uint64_t num_input_records ;
uint64_t num_output_records ;
SequenceNumber earliest_snapshot ;
SequenceNumber visible_at_tip ;
SequenceNumber latest_snapshot ;
CompactionJobStats compaction_job_stats ;
// "level_ptrs" holds indices that remember which file of an associated
@ -113,20 +110,15 @@ struct CompactionJob::SubCompactionState {
// is in or beyond the last file checked during the previous call
std : : vector < size_t > level_ptrs ;
SubCompactionState ( Compaction * c , Slice * _start , Slice * _end ,
SequenceNumber earliest , SequenceNumber visible ,
SequenceNumber latest )
: compaction ( c ) ,
start ( _start ) ,
end ( _end ) ,
outfile ( nullptr ) ,
builder ( nullptr ) ,
total_bytes ( 0 ) ,
num_input_records ( 0 ) ,
num_output_records ( 0 ) ,
earliest_snapshot ( earliest ) ,
visible_at_tip ( visible ) ,
latest_snapshot ( latest ) {
SubCompactionState ( Compaction * c , Slice * _start , Slice * _end )
: compaction ( c ) ,
start ( _start ) ,
end ( _end ) ,
outfile ( nullptr ) ,
builder ( nullptr ) ,
total_bytes ( 0 ) ,
num_input_records ( 0 ) ,
num_output_records ( 0 ) {
assert ( compaction ! = nullptr ) ;
level_ptrs = std : : vector < size_t > ( compaction - > number_levels ( ) , 0 ) ;
}
@ -146,9 +138,6 @@ struct CompactionJob::SubCompactionState {
total_bytes = std : : move ( o . total_bytes ) ;
num_input_records = std : : move ( o . num_input_records ) ;
num_output_records = std : : move ( o . num_output_records ) ;
earliest_snapshot = std : : move ( o . earliest_snapshot ) ;
visible_at_tip = std : : move ( o . visible_at_tip ) ;
latest_snapshot = std : : move ( o . latest_snapshot ) ;
level_ptrs = std : : move ( o . level_ptrs ) ;
return * this ;
}
@ -321,32 +310,29 @@ void CompactionJob::Prepare() {
bottommost_level_ = c - > bottommost_level ( ) ;
// Initialize subcompaction states
SequenceNumber earliest_snapshot ;
SequenceNumber latest_snapshot = 0 ;
SequenceNumber visible_at_tip = 0 ;
latest_snapshot_ = 0 ;
visible_at_tip_ = 0 ;
if ( existing_snapshots_ . size ( ) = = 0 ) {
// optimize for fast path if there are no snapshots
visible_at_tip = versions_ - > LastSequence ( ) ;
earliest_snapshot = visible_at_tip ;
visible_at_tip_ = versions_ - > LastSequence ( ) ;
earliest_snapshot_ = visible_at_tip_ ;
} else {
latest_snapshot = existing_snapshots_ . back ( ) ;
latest_snapshot_ = existing_snapshots_ . back ( ) ;
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
existing_snapshots_ . push_back ( versions_ - > LastSequence ( ) ) ;
earliest_snapshot = existing_snapshots_ [ 0 ] ;
earliest_snapshot_ = existing_snapshots_ [ 0 ] ;
}
InitializeSubCompactions ( earliest_snapshot , visible_at_tip , latest_snapshot ) ;
InitializeSubCompactions ( ) ;
}
// For L0-L1 compaction, iterators work in parallel by processing
// different subsets of the full key range. This function sets up
// the local states used by each of these subcompactions during
// their execution
void CompactionJob : : InitializeSubCompactions ( const SequenceNumber & earliest ,
const SequenceNumber & visible ,
const SequenceNumber & latest ) {
void CompactionJob : : InitializeSubCompactions ( ) {
Compaction * c = compact_ - > compaction ;
auto & bounds = sub_compaction_boundaries_ ;
if ( c - > IsSubCompaction ( ) ) {
@ -410,8 +396,7 @@ void CompactionJob::InitializeSubCompactions(const SequenceNumber& earliest,
for ( size_t i = 0 ; i < = bounds . size ( ) ; i + + ) {
Slice * start = i = = 0 ? nullptr : & bounds [ i - 1 ] ;
Slice * end = i = = bounds . size ( ) ? nullptr : & bounds [ i ] ;
compact_ - > sub_compact_states . emplace_back ( compact_ - > compaction , start ,
end , earliest , visible , latest ) ;
compact_ - > sub_compact_states . emplace_back ( compact_ - > compaction , start , end ) ;
}
}
@ -658,8 +643,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) {
visible_in_snapshot = kMaxSequenceNumber ;
// apply the compaction filter to the first occurrence of the user key
if ( compaction_filter & & ikey . type = = kTypeValue & &
( sub_compact - > visible_at_tip | |
ikey . sequence > sub_compact - > latest_snapshot ) ) {
( visible_at_tip_ | | ikey . sequence > latest_snapshot_ ) ) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
@ -695,9 +679,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) {
// the earlist snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0 ; // 0 means no previous snapshot
SequenceNumber visible =
sub_compact - > visible_at_tip
? sub_compact - > visible_at_tip
: findEarliestVisibleSnapshot ( ikey . sequence , & prev_snapshot ) ;
visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot (
ikey . sequence , & prev_snapshot ) ;
if ( visible_in_snapshot = = visible ) {
// If the earliest snapshot is which this key is visible in
@ -709,7 +692,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) {
+ + key_drop_newer_entry ;
input - > Next ( ) ; // (A)
} else if ( ikey . type = = kTypeDeletion & &
ikey . sequence < = sub_compact - > earliest_snapshot & &
ikey . sequence < = earliest_snapshot_ & &
sub_compact - > compaction - > KeyNotExistsBeyondOutputLevel (
ikey . user_key , & sub_compact - > level_ptrs ) ) {
// For this user key:
@ -815,7 +798,7 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value,
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if ( bottommost_level_ & & ikey . sequence < sub_compact - > earliest_snapshot & &
if ( bottommost_level_ & & ikey . sequence < earliest_snapshot_ & &
ikey . type ! = kTypeMerge ) {
assert ( ikey . type ! = kTypeDeletion ) ;
// make a copy because updating in place would cause problems