@ -210,29 +210,15 @@ Status CompactionJob::Run() {
ThreadStatus : : STAGE_COMPACTION_RUN ) ;
TEST_SYNC_POINT ( " CompactionJob::Run():Start " ) ;
log_buffer_ - > FlushBufferToLog ( ) ;
ColumnFamilyData * cfd = compact_ - > compaction - > column_family_data ( ) ;
auto * compaction = compact_ - > compaction ;
LogCompaction ( cfd , compaction ) ;
LogCompaction ( compaction - > column_ family_ data ( ) , compaction ) ;
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
const uint64_t start_micros = env_ - > NowMicros ( ) ;
std : : unique_ptr < Iterator > input (
versions_ - > MakeInputIterator ( compact_ - > compaction ) ) ;
input - > SeekToFirst ( ) ;
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
std : : unique_ptr < Iterator > input ( versions_ - > MakeInputIterator ( compaction ) ) ;
input - > SeekToFirst ( ) ;
auto status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) ) ;
if ( status . ok ( ) & &
( shutting_down_ - > load ( std : : memory_order_acquire ) | | cfd - > IsDropped ( ) ) ) {
status = Status : : ShutdownInProgress (
" Database shutdown or Column family drop during compaction " ) ;
}
if ( status . ok ( ) & & compact_ - > builder ! = nullptr ) {
status = FinishCompactionOutputFile ( input - > status ( ) ) ;
}
if ( status . ok ( ) ) {
status = input - > status ( ) ;
}
input . reset ( ) ;
if ( output_directory_ & & ! db_options_ . disableDataSync ) {
@ -331,7 +317,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
false /* internal key corruption is expected */ ) ;
auto compaction_filter = cfd - > ioptions ( ) - > compaction_filter ;
std : : unique_ptr < CompactionFilter > compaction_filter_from_factory = nullptr ;
if ( ! compaction_filte r) {
if ( compaction_filter = = nullpt r) {
compaction_filter_from_factory =
compact_ - > compaction - > CreateCompactionFilter ( ) ;
compaction_filter = compaction_filter_from_factory . get ( ) ;
@ -346,6 +332,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
StopWatchNano timer ( env_ , stats_ ! = nullptr ) ;
uint64_t total_filter_time = 0 ;
// TODO(noetzli): check whether we could check !shutting_down_->... only
// only occasionally (see diff D42687)
while ( input - > Valid ( ) & & ! shutting_down_ - > load ( std : : memory_order_acquire ) & &
! cfd - > IsDropped ( ) & & status . ok ( ) ) {
compact_ - > num_input_records + + ;
@ -360,10 +349,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
Slice value = input - > value ( ) ;
if ( compaction_job_stats_ ! = nullptr ) {
compaction_job_stats_ - > total_input_raw_key_bytes + =
input - > key ( ) . size ( ) ;
compaction_job_stats_ - > total_input_raw_value_bytes + =
input - > value ( ) . size ( ) ;
compaction_job_stats_ - > total_input_raw_key_bytes + = key . size ( ) ;
compaction_job_stats_ - > total_input_raw_value_bytes + = value . size ( ) ;
}
if ( compact_ - > compaction - > ShouldStopBefore ( key ) & &
@ -375,8 +362,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
}
// Handle key/value, add to state, etc.
bool drop = false ;
bool current_entry_is_merging = false ;
if ( ! ParseInternalKey ( key , & ikey ) ) {
// Do not hide error keys
// TODO: error key stays in db forever? Figure out the intention/rationale
@ -385,177 +370,161 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
has_current_user_key = false ;
last_sequence_for_key = kMaxSequenceNumber ;
visible_in_snapshot = kMaxSequenceNumber ;
} else {
if ( compaction_job_stats_ ! = nullptr & & ikey . type = = kTypeDeletion ) {
compaction_job_stats_ - > num_input_deletion_records + + ;
}
if ( ! has_current_user_key | |
cfd - > user_comparator ( ) - > Compare ( ikey . user_key ,
current_user_key . GetKey ( ) ) ! = 0 ) {
// First occurrence of this user key
current_user_key . SetKey ( ikey . user_key ) ;
has_current_user_key = true ;
last_sequence_for_key = kMaxSequenceNumber ;
visible_in_snapshot = kMaxSequenceNumber ;
// apply the compaction filter to the first occurrence of the user key
if ( compaction_filter & & ikey . type = = kTypeValue & &
( visible_at_tip_ | | ikey . sequence > latest_snapshot_ ) ) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
bool value_changed = false ;
compaction_filter_value . clear ( ) ;
if ( stats_ ! = nullptr ) {
timer . Start ( ) ;
}
bool to_delete = compaction_filter - > Filter (
compact_ - > compaction - > level ( ) , ikey . user_key , value ,
& compaction_filter_value , & value_changed ) ;
total_filter_time + = timer . ElapsedNanos ( ) ;
if ( to_delete ) {
// make a copy of the original key and convert it to a delete
delete_key . SetInternalKey ( ExtractUserKey ( key ) , ikey . sequence ,
kTypeDeletion ) ;
// anchor the key again
key = delete_key . GetKey ( ) ;
// needed because ikey is backed by key
ParseInternalKey ( key , & ikey ) ;
// no value associated with delete
value . clear ( ) ;
+ + key_drop_user ;
} else if ( value_changed ) {
value = compaction_filter_value ;
}
}
if ( compaction_job_stats_ ! = nullptr ) {
compaction_job_stats_ - > num_corrupt_keys + + ;
}
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0 ; // 0 means no previous snapshot
SequenceNumber visible =
visible_at_tip_
? visible_at_tip_
: findEarliestVisibleSnapshot ( ikey . sequence , existing_snapshots_ ,
& prev_snapshot ) ;
if ( visible_in_snapshot = = visible ) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert ( last_sequence_for_key > = ikey . sequence ) ;
drop = true ; // (A)
+ + key_drop_newer_entry ;
} else if ( ikey . type = = kTypeDeletion & &
ikey . sequence < = earliest_snapshot_ & &
compact_ - > compaction - > KeyNotExistsBeyondOutputLevel (
ikey . user_key ) ) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true ;
+ + key_drop_obsolete ;
} else if ( ikey . type = = kTypeMerge ) {
if ( ! merge . HasOperator ( ) ) {
LogToBuffer ( log_buffer_ , " Options::merge_operator is null. " ) ;
status = Status : : InvalidArgument (
" merge_operator is not properly initialized. " ) ;
break ;
status = WriteKeyValue ( key , value , ikey , input - > status ( ) ) ;
input - > Next ( ) ;
continue ;
}
if ( compaction_job_stats_ ! = nullptr & & ikey . type = = kTypeDeletion ) {
compaction_job_stats_ - > num_input_deletion_records + + ;
}
if ( ! has_current_user_key | |
cfd - > user_comparator ( ) - > Compare ( ikey . user_key ,
current_user_key . GetKey ( ) ) ! = 0 ) {
// First occurrence of this user key
current_user_key . SetKey ( ikey . user_key ) ;
has_current_user_key = true ;
last_sequence_for_key = kMaxSequenceNumber ;
visible_in_snapshot = kMaxSequenceNumber ;
// apply the compaction filter to the first occurrence of the user key
if ( compaction_filter & & ikey . type = = kTypeValue & &
( visible_at_tip_ | | ikey . sequence > latest_snapshot_ ) ) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
bool value_changed = false ;
compaction_filter_value . clear ( ) ;
if ( stats_ ! = nullptr ) {
timer . Start ( ) ;
}
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
merge . MergeUntil ( input , prev_snapshot , bottommost_level_ ,
db_options_ . statistics . get ( ) , nullptr , env_ ) ;
current_entry_is_merging = true ;
if ( merge . IsSuccess ( ) ) {
// Successfully found Put/Delete/(end-of-key-range) while merging
// Get the merge result
key = merge . key ( ) ;
bool to_delete = compaction_filter - > Filter (
compact_ - > compaction - > level ( ) , ikey . user_key , value ,
& compaction_filter_value , & value_changed ) ;
total_filter_time + = timer . ElapsedNanos ( ) ;
if ( to_delete ) {
// make a copy of the original key and convert it to a delete
delete_key . SetInternalKey ( ExtractUserKey ( key ) , ikey . sequence ,
kTypeDeletion ) ;
// anchor the key again
key = delete_key . GetKey ( ) ;
// needed because ikey is backed by key
ParseInternalKey ( key , & ikey ) ;
value = merge . value ( ) ;
} else {
// Did not find a Put/Delete/(end-of-key-range) while merging
// We now have some stack of merge operands to write out.
// NOTE: key,value, and ikey are now referring to old entries.
// These will be correctly set below.
assert ( ! merge . keys ( ) . empty ( ) ) ;
assert ( merge . keys ( ) . size ( ) = = merge . values ( ) . size ( ) ) ;
// Hack to make sure last_sequence_for_key is correct
ParseInternalKey ( merge . keys ( ) . front ( ) , & ikey ) ;
// no value associated with delete
value . clear ( ) ;
+ + key_drop_user ;
} else if ( value_changed ) {
value = compaction_filter_value ;
}
}
last_sequence_for_key = ikey . sequence ;
visible_in_snapshot = visible ;
}
if ( ! drop ) {
// We may write a single key (e.g.: for Put/Delete or successful merge).
// Or we may instead have to write a sequence/list of keys.
// We have to write a sequence iff we have an unsuccessful merge
if ( current_entry_is_merging & & ! merge . IsSuccess ( ) ) {
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber prev_snapshot = 0 ; // 0 means no previous snapshot
SequenceNumber visible =
visible_at_tip_
? visible_at_tip_
: findEarliestVisibleSnapshot ( ikey . sequence , existing_snapshots_ ,
& prev_snapshot ) ;
if ( visible_in_snapshot = = visible ) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert ( last_sequence_for_key > = ikey . sequence ) ;
+ + key_drop_newer_entry ;
input - > Next ( ) ; // (A)
} else if ( ikey . type = = kTypeDeletion & &
ikey . sequence < = earliest_snapshot_ & &
compact_ - > compaction - > KeyNotExistsBeyondOutputLevel (
ikey . user_key ) ) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
+ + key_drop_obsolete ;
input - > Next ( ) ;
} else if ( ikey . type = = kTypeMerge ) {
if ( ! merge . HasOperator ( ) ) {
LogToBuffer ( log_buffer_ , " Options::merge_operator is null. " ) ;
status = Status : : InvalidArgument (
" merge_operator is not properly initialized. " ) ;
break ;
}
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
merge . MergeUntil ( input , prev_snapshot , bottommost_level_ ,
db_options_ . statistics . get ( ) , env_ ) ;
if ( merge . IsSuccess ( ) ) {
// Successfully found Put/Delete/(end-of-key-range) while merging
// Get the merge result
key = merge . key ( ) ;
ParseInternalKey ( key , & ikey ) ;
status = WriteKeyValue ( key , merge . value ( ) , ikey , input - > status ( ) ) ;
} else {
// Did not find a Put/Delete/(end-of-key-range) while merging
// We now have some stack of merge operands to write out.
// NOTE: key,value, and ikey are now referring to old entries.
// These will be correctly set below.
const auto & keys = merge . keys ( ) ;
const auto & values = merge . values ( ) ;
std : : deque < std : : string > : : const_reverse_iterator key_iter =
keys . rbegin ( ) ; // The back (*rbegin()) is the first key
std : : deque < std : : string > : : const_reverse_iterator value_iter =
values . rbegin ( ) ;
key = Slice ( * key_iter ) ;
value = Slice ( * value_iter ) ;
// We have a list of keys to write, traverse the list.
while ( true ) {
status = WriteKeyValue ( key , value , ikey , input - > status ( ) ) ;
if ( ! status . ok ( ) ) {
break ;
}
+ + key_iter ;
+ + value_iter ;
assert ( ! keys . empty ( ) ) ;
assert ( keys . size ( ) = = values . size ( ) ) ;
// If at end of list
if ( key_iter = = keys . rend ( ) | | value_iter = = values . rend ( ) ) {
// Sanity Check: if one ends, then both end
assert ( key_iter = = keys . rend ( ) & & value_iter = = values . rend ( ) ) ;
break ;
}
// Otherwise not at end of list. Update key, value, and ikey.
// We have a list of keys to write, write all keys in the list.
for ( auto key_iter = keys . rbegin ( ) , value_iter = values . rbegin ( ) ;
! status . ok ( ) | | key_iter ! = keys . rend ( ) ;
key_iter + + , value_iter + + ) {
key = Slice ( * key_iter ) ;
value = Slice ( * value_iter ) ;
ParseInternalKey ( key , & ikey ) ;
status = WriteKeyValue ( key , value , ikey , input - > status ( ) ) ;
}
} else {
// There is only one item to be written out
status = WriteKeyValue ( key , value , ikey , input - > status ( ) ) ;
}
} // if (!drop)
// MergeUntil has moved input to the next entry
if ( ! current_entry_is_merging ) {
} else {
status = WriteKeyValue ( key , value , ikey , input - > status ( ) ) ;
input - > Next ( ) ;
}
last_sequence_for_key = ikey . sequence ;
visible_in_snapshot = visible ;
}
RecordTick ( stats_ , FILTER_OPERATION_TOTAL_TIME , total_filter_time ) ;
RecordDroppedKeys ( & key_drop_user , & key_drop_newer_entry , & key_drop_obsolete ) ;
RecordCompactionIOStats ( ) ;
if ( status . ok ( ) & &
( shutting_down_ - > load ( std : : memory_order_acquire ) | | cfd - > IsDropped ( ) ) ) {
status = Status : : ShutdownInProgress (
" Database shutdown or Column family drop during compaction " ) ;
}
if ( status . ok ( ) & & compact_ - > builder ! = nullptr ) {
status = FinishCompactionOutputFile ( input - > status ( ) ) ;
}
if ( status . ok ( ) ) {
status = input - > status ( ) ;
}
return status ;
}