@ -84,102 +84,8 @@ struct CompactionJob::CompactionState {
num_input_records ( 0 ) ,
num_input_records ( 0 ) ,
num_output_records ( 0 ) { }
num_output_records ( 0 ) { }
std : : vector < std : : string > key_str_buf_ ;
std : : vector < std : : string > existing_value_str_buf_ ;
// new_value_buf_ will only be appended if a value changes
std : : vector < std : : string > new_value_buf_ ;
// if values_changed_buf_[i] is true
// new_value_buf_ will add a new entry with the changed value
std : : vector < bool > value_changed_buf_ ;
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
std : : vector < bool > to_delete_buf_ ;
std : : vector < std : : string > other_key_str_buf_ ;
std : : vector < std : : string > other_value_str_buf_ ;
std : : vector < Slice > combined_key_buf_ ;
std : : vector < Slice > combined_value_buf_ ;
std : : string cur_prefix_ ;
uint64_t num_input_records ;
uint64_t num_input_records ;
uint64_t num_output_records ;
uint64_t num_output_records ;
// Buffers the kv-pair that will be run through compaction filter V2
// in the future.
void BufferKeyValueSlices ( const Slice & key , const Slice & value ) {
key_str_buf_ . emplace_back ( key . ToString ( ) ) ;
existing_value_str_buf_ . emplace_back ( value . ToString ( ) ) ;
}
// Buffers the kv-pair that will not be run through compaction filter V2
// in the future.
void BufferOtherKeyValueSlices ( const Slice & key , const Slice & value ) {
other_key_str_buf_ . emplace_back ( key . ToString ( ) ) ;
other_value_str_buf_ . emplace_back ( value . ToString ( ) ) ;
}
// Add a kv-pair to the combined buffer
void AddToCombinedKeyValueSlices ( const Slice & key , const Slice & value ) {
// The real strings are stored in the batch buffers
combined_key_buf_ . emplace_back ( key ) ;
combined_value_buf_ . emplace_back ( value ) ;
}
// Merging the two buffers
void MergeKeyValueSliceBuffer ( const InternalKeyComparator * comparator ) {
size_t i = 0 ;
size_t j = 0 ;
size_t total_size = key_str_buf_ . size ( ) + other_key_str_buf_ . size ( ) ;
combined_key_buf_ . reserve ( total_size ) ;
combined_value_buf_ . reserve ( total_size ) ;
while ( i + j < total_size ) {
int comp_res = 0 ;
if ( i < key_str_buf_ . size ( ) & & j < other_key_str_buf_ . size ( ) ) {
comp_res = comparator - > Compare ( key_str_buf_ [ i ] , other_key_str_buf_ [ j ] ) ;
} else if ( i > = key_str_buf_ . size ( ) & & j < other_key_str_buf_ . size ( ) ) {
comp_res = 1 ;
} else if ( j > = other_key_str_buf_ . size ( ) & & i < key_str_buf_ . size ( ) ) {
comp_res = - 1 ;
}
if ( comp_res > 0 ) {
AddToCombinedKeyValueSlices ( other_key_str_buf_ [ j ] ,
other_value_str_buf_ [ j ] ) ;
j + + ;
} else if ( comp_res < 0 ) {
AddToCombinedKeyValueSlices ( key_str_buf_ [ i ] ,
existing_value_str_buf_ [ i ] ) ;
i + + ;
}
}
}
void CleanupBatchBuffer ( ) {
to_delete_buf_ . clear ( ) ;
key_str_buf_ . clear ( ) ;
existing_value_str_buf_ . clear ( ) ;
new_value_buf_ . clear ( ) ;
value_changed_buf_ . clear ( ) ;
to_delete_buf_ . shrink_to_fit ( ) ;
key_str_buf_ . shrink_to_fit ( ) ;
existing_value_str_buf_ . shrink_to_fit ( ) ;
new_value_buf_ . shrink_to_fit ( ) ;
value_changed_buf_ . shrink_to_fit ( ) ;
other_key_str_buf_ . clear ( ) ;
other_value_str_buf_ . clear ( ) ;
other_key_str_buf_ . shrink_to_fit ( ) ;
other_value_str_buf_ . shrink_to_fit ( ) ;
}
void CleanupMergedBuffer ( ) {
combined_key_buf_ . clear ( ) ;
combined_value_buf_ . clear ( ) ;
combined_key_buf_ . shrink_to_fit ( ) ;
combined_value_buf_ . shrink_to_fit ( ) ;
}
} ;
} ;
CompactionJob : : CompactionJob (
CompactionJob : : CompactionJob (
@ -271,8 +177,6 @@ void CompactionJob::ReportStartedCompaction(
void CompactionJob : : Prepare ( ) {
void CompactionJob : : Prepare ( ) {
AutoThreadOperationStageUpdater stage_updater (
AutoThreadOperationStageUpdater stage_updater (
ThreadStatus : : STAGE_COMPACTION_PREPARE ) ;
ThreadStatus : : STAGE_COMPACTION_PREPARE ) ;
compact_ - > CleanupBatchBuffer ( ) ;
compact_ - > CleanupMergedBuffer ( ) ;
// Generate file_levels_ for compaction berfore making Iterator
// Generate file_levels_ for compaction berfore making Iterator
ColumnFamilyData * cfd __attribute__ ( ( unused ) ) =
ColumnFamilyData * cfd __attribute__ ( ( unused ) ) =
@ -316,18 +220,8 @@ Status CompactionJob::Run() {
versions_ - > MakeInputIterator ( compact_ - > compaction ) ) ;
versions_ - > MakeInputIterator ( compact_ - > compaction ) ) ;
input - > SeekToFirst ( ) ;
input - > SeekToFirst ( ) ;
std : : unique_ptr < CompactionFilterV2 > compaction_filter_from_factory_v2 =
compact_ - > compaction - > CreateCompactionFilterV2 ( ) ;
auto compaction_filter_v2 = compaction_filter_from_factory_v2 . get ( ) ;
Status status ;
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
if ( ! compaction_filter_v2 ) {
auto status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) ) ;
status = ProcessKeyValueCompaction ( & imm_micros , input . get ( ) , false ) ;
} else {
status = ProcessPrefixBatches ( cfd , & imm_micros , input . get ( ) ,
compaction_filter_v2 ) ;
}
if ( status . ok ( ) & &
if ( status . ok ( ) & &
( shutting_down_ - > load ( std : : memory_order_acquire ) | | cfd - > IsDropped ( ) ) ) {
( shutting_down_ - > load ( std : : memory_order_acquire ) | | cfd - > IsDropped ( ) ) ) {
@ -418,141 +312,10 @@ void CompactionJob::Install(Status* status,
CleanupCompaction ( * status ) ;
CleanupCompaction ( * status ) ;
}
}
Status CompactionJob : : ProcessPrefixBatches (
ColumnFamilyData * cfd ,
int64_t * imm_micros ,
Iterator * input ,
CompactionFilterV2 * compaction_filter_v2 ) {
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
ParsedInternalKey ikey ;
Status status ;
bool prefix_initialized = false ;
shared_ptr < Iterator > backup_input (
versions_ - > MakeInputIterator ( compact_ - > compaction ) ) ;
backup_input - > SeekToFirst ( ) ;
uint64_t total_filter_time = 0 ;
while ( backup_input - > Valid ( ) & &
! shutting_down_ - > load ( std : : memory_order_acquire ) & &
! cfd - > IsDropped ( ) ) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros + = yield_callback_ ( ) ;
Slice key = backup_input - > key ( ) ;
Slice value = backup_input - > value ( ) ;
if ( ! ParseInternalKey ( key , & ikey ) ) {
// log error
Log ( InfoLogLevel : : WARN_LEVEL , db_options_ . info_log ,
" [%s] [JOB %d] Failed to parse key: %s " , cfd - > GetName ( ) . c_str ( ) ,
job_id_ , key . ToString ( ) . c_str ( ) ) ;
continue ;
} else {
const SliceTransform * transformer =
cfd - > ioptions ( ) - > compaction_filter_factory_v2 - > GetPrefixExtractor ( ) ;
const auto key_prefix = transformer - > Transform ( ikey . user_key ) ;
if ( ! prefix_initialized ) {
compact_ - > cur_prefix_ = key_prefix . ToString ( ) ;
prefix_initialized = true ;
}
// If the prefix remains the same, keep buffering
if ( key_prefix . compare ( Slice ( compact_ - > cur_prefix_ ) ) = = 0 ) {
// Apply the compaction filter V2 to all the kv pairs sharing
// the same prefix
if ( ikey . type = = kTypeValue & &
( visible_at_tip_ | | ikey . sequence > latest_snapshot_ ) ) {
// Buffer all keys sharing the same prefix for CompactionFilterV2
// Iterate through keys to check prefix
compact_ - > BufferKeyValueSlices ( key , value ) ;
} else {
// buffer ineligible keys
compact_ - > BufferOtherKeyValueSlices ( key , value ) ;
}
backup_input - > Next ( ) ;
continue ;
// finish changing values for eligible keys
} else {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if ( compact_ - > key_str_buf_ . size ( ) > 0 ) {
uint64_t time = 0 ;
CallCompactionFilterV2 ( compaction_filter_v2 , & time ) ;
total_filter_time + = time ;
}
compact_ - > cur_prefix_ = key_prefix . ToString ( ) ;
}
}
// Merge this batch of data (values + ineligible keys)
compact_ - > MergeKeyValueSliceBuffer ( & cfd - > internal_comparator ( ) ) ;
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction ( imm_micros , input , true ) ;
if ( ! status . ok ( ) ) {
break ;
}
// After writing the kv-pairs, we can safely remove the reference
// to the string buffer and clean them up
compact_ - > CleanupBatchBuffer ( ) ;
compact_ - > CleanupMergedBuffer ( ) ;
// Buffer the key that triggers the mismatch in prefix
if ( ikey . type = = kTypeValue & &
( visible_at_tip_ | | ikey . sequence > latest_snapshot_ ) ) {
compact_ - > BufferKeyValueSlices ( key , value ) ;
} else {
compact_ - > BufferOtherKeyValueSlices ( key , value ) ;
}
backup_input - > Next ( ) ;
if ( ! backup_input - > Valid ( ) ) {
// If this is the single last value, we need to merge it.
if ( compact_ - > key_str_buf_ . size ( ) > 0 ) {
uint64_t time = 0 ;
CallCompactionFilterV2 ( compaction_filter_v2 , & time ) ;
total_filter_time + = time ;
}
compact_ - > MergeKeyValueSliceBuffer ( & cfd - > internal_comparator ( ) ) ;
status = ProcessKeyValueCompaction ( imm_micros , input , true ) ;
if ( ! status . ok ( ) ) {
break ;
}
compact_ - > CleanupBatchBuffer ( ) ;
compact_ - > CleanupMergedBuffer ( ) ;
}
} // done processing all prefix batches
// finish the last batch
if ( status . ok ( ) ) {
if ( compact_ - > key_str_buf_ . size ( ) > 0 ) {
uint64_t time = 0 ;
CallCompactionFilterV2 ( compaction_filter_v2 , & time ) ;
total_filter_time + = time ;
}
compact_ - > MergeKeyValueSliceBuffer ( & cfd - > internal_comparator ( ) ) ;
status = ProcessKeyValueCompaction ( imm_micros , input , true ) ;
}
RecordTick ( stats_ , FILTER_OPERATION_TOTAL_TIME , total_filter_time ) ;
return status ;
}
Status CompactionJob : : ProcessKeyValueCompaction ( int64_t * imm_micros ,
Status CompactionJob : : ProcessKeyValueCompaction ( int64_t * imm_micros ,
Iterator * input ,
Iterator * input ) {
bool is_compaction_v2 ) {
AutoThreadOperationStageUpdater stage_updater (
AutoThreadOperationStageUpdater stage_updater (
ThreadStatus : : STAGE_COMPACTION_PROCESS_KV ) ;
ThreadStatus : : STAGE_COMPACTION_PROCESS_KV ) ;
size_t combined_idx = 0 ;
Status status ;
Status status ;
std : : string compaction_filter_value ;
std : : string compaction_filter_value ;
ParsedInternalKey ikey ;
ParsedInternalKey ikey ;
@ -599,26 +362,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// on other column families, too
// on other column families, too
( * imm_micros ) + = yield_callback_ ( ) ;
( * imm_micros ) + = yield_callback_ ( ) ;
Slice key ;
Slice key = input - > key ( ) ;
Slice value ;
Slice value = input - > value ( ) ;
// If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
// This prefix batch should contain results after calling
// compaction_filter_v2.
//
// If is_compaction_v2 is off, this function will go through all the
// kv-pairs in input.
if ( ! is_compaction_v2 ) {
key = input - > key ( ) ;
value = input - > value ( ) ;
} else {
if ( combined_idx > = compact_ - > combined_key_buf_ . size ( ) ) {
break ;
}
key = compact_ - > combined_key_buf_ [ combined_idx ] ;
value = compact_ - > combined_value_buf_ [ combined_idx ] ;
+ + combined_idx ;
}
if ( compaction_job_stats_ ! = nullptr ) {
if ( compaction_job_stats_ ! = nullptr ) {
compaction_job_stats_ - > total_input_raw_key_bytes + =
compaction_job_stats_ - > total_input_raw_key_bytes + =
@ -660,7 +405,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
last_sequence_for_key = kMaxSequenceNumber ;
last_sequence_for_key = kMaxSequenceNumber ;
visible_in_snapshot = kMaxSequenceNumber ;
visible_in_snapshot = kMaxSequenceNumber ;
// apply the compaction filter to the first occurrence of the user key
// apply the compaction filter to the first occurrence of the user key
if ( compaction_filter & & ! is_compaction_v2 & & ikey . type = = kTypeValue & &
if ( compaction_filter & & ikey . type = = kTypeValue & &
( visible_at_tip_ | | ikey . sequence > latest_snapshot_ ) ) {
( visible_at_tip_ | | ikey . sequence > latest_snapshot_ ) ) {
// If the user has specified a compaction filter and the sequence
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// number is greater than any external snapshot, then invoke the
@ -738,11 +483,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
// object to minimize change to the existing flow. Turn out this
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
// optimization in BuildTable.
int steps = 0 ;
merge . MergeUntil ( input , prev_snapshot , bottommost_level_ ,
merge . MergeUntil ( input , prev_snapshot , bottommost_level_ ,
db_options_ . statistics . get ( ) , & steps , env_ ) ;
db_options_ . statistics . get ( ) , nullptr , env_ ) ;
// Skip the Merge ops
combined_idx = combined_idx - 1 + steps ;
current_entry_is_merging = true ;
current_entry_is_merging = true ;
if ( merge . IsSuccess ( ) ) {
if ( merge . IsSuccess ( ) ) {
@ -899,70 +641,6 @@ void CompactionJob::RecordDroppedKeys(
}
}
}
}
void CompactionJob : : CallCompactionFilterV2 (
CompactionFilterV2 * compaction_filter_v2 , uint64_t * time ) {
if ( compact_ = = nullptr | | compaction_filter_v2 = = nullptr ) {
return ;
}
AutoThreadOperationStageUpdater stage_updater (
ThreadStatus : : STAGE_COMPACTION_FILTER_V2 ) ;
// Assemble slice vectors for user keys and existing values.
// We also keep track of our parsed internal key structs because
// we may need to access the sequence number in the event that
// keys are garbage collected during the filter process.
std : : vector < ParsedInternalKey > ikey_buf ;
std : : vector < Slice > user_key_buf ;
std : : vector < Slice > existing_value_buf ;
for ( const auto & key : compact_ - > key_str_buf_ ) {
ParsedInternalKey ikey ;
ParseInternalKey ( Slice ( key ) , & ikey ) ;
ikey_buf . emplace_back ( ikey ) ;
user_key_buf . emplace_back ( ikey . user_key ) ;
}
for ( const auto & value : compact_ - > existing_value_str_buf_ ) {
existing_value_buf . emplace_back ( Slice ( value ) ) ;
}
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
StopWatchNano timer ( env_ , stats_ ! = nullptr ) ;
compact_ - > to_delete_buf_ = compaction_filter_v2 - > Filter (
compact_ - > compaction - > level ( ) , user_key_buf , existing_value_buf ,
& compact_ - > new_value_buf_ , & compact_ - > value_changed_buf_ ) ;
* time = timer . ElapsedNanos ( ) ;
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
// kv-pairs in this compaction run needs to be deleted.
assert ( compact_ - > to_delete_buf_ . size ( ) = = compact_ - > key_str_buf_ . size ( ) ) ;
assert ( compact_ - > to_delete_buf_ . size ( ) = =
compact_ - > existing_value_str_buf_ . size ( ) ) ;
assert ( compact_ - > value_changed_buf_ . empty ( ) | |
compact_ - > to_delete_buf_ . size ( ) = =
compact_ - > value_changed_buf_ . size ( ) ) ;
int new_value_idx = 0 ;
for ( unsigned int i = 0 ; i < compact_ - > to_delete_buf_ . size ( ) ; + + i ) {
if ( compact_ - > to_delete_buf_ [ i ] ) {
// update the string buffer directly
// the Slice buffer points to the updated buffer
UpdateInternalKey ( & compact_ - > key_str_buf_ [ i ] , ikey_buf [ i ] . sequence ,
kTypeDeletion ) ;
// no value associated with delete
compact_ - > existing_value_str_buf_ [ i ] . clear ( ) ;
RecordTick ( stats_ , COMPACTION_KEY_DROP_USER ) ;
} else if ( ! compact_ - > value_changed_buf_ . empty ( ) & &
compact_ - > value_changed_buf_ [ i ] ) {
compact_ - > existing_value_str_buf_ [ i ] =
compact_ - > new_value_buf_ [ new_value_idx + + ] ;
}
} // for
}
Status CompactionJob : : FinishCompactionOutputFile ( const Status & input_status ) {
Status CompactionJob : : FinishCompactionOutputFile ( const Status & input_status ) {
AutoThreadOperationStageUpdater stage_updater (
AutoThreadOperationStageUpdater stage_updater (
ThreadStatus : : STAGE_COMPACTION_SYNC_FILE ) ;
ThreadStatus : : STAGE_COMPACTION_SYNC_FILE ) ;