@ -221,39 +221,43 @@ void CompactionIterator::Next() {
bool CompactionIterator : : InvokeFilterIfNeeded ( bool * need_skip ,
bool CompactionIterator : : InvokeFilterIfNeeded ( bool * need_skip ,
Slice * skip_until ) {
Slice * skip_until ) {
if ( ! compaction_filter_ ) {
return true ;
}
// TODO: support compaction filter for wide-column entities
// TODO: support compaction filter for wide-column entities
if ( ! compaction_filter_ | |
if ( ikey_ . type ! = kTypeValue & & ikey_ . type ! = kTypeBlobIndex ) {
( ikey_ . type ! = kTypeValue & & ikey_ . type ! = kTypeBlobIndex ) ) {
return true ;
return true ;
}
}
bool error = false ;
// If the user has specified a compaction filter and the sequence
CompactionFilter : : Decision decision =
// number is greater than any external snapshot, then invoke the
CompactionFilter : : Decision : : kUndetermined ;
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
CompactionFilter : : Decision filter = CompactionFilter : : Decision : : kUndetermined ;
compaction_filter_value_ . clear ( ) ;
compaction_filter_skip_until_ . Clear ( ) ;
CompactionFilter : : ValueType value_type =
CompactionFilter : : ValueType value_type =
ikey_ . type = = kTypeValue ? CompactionFilter : : ValueType : : kValue
ikey_ . type = = kTypeValue ? CompactionFilter : : ValueType : : kValue
: CompactionFilter : : ValueType : : kBlobIndex ;
: CompactionFilter : : ValueType : : kBlobIndex ;
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
// to get sequence number.
assert ( compaction_filter_ ) ;
assert ( compaction_filter_ ) ;
Slice & filter_key =
const Slice & filter_key =
( ikey_ . type = = kTypeValue | |
( ikey_ . type ! = kTypeBlobIndex | |
! compaction_filter_ - > IsStackedBlobDbInternalCompactionFilter ( ) )
! compaction_filter_ - > IsStackedBlobDbInternalCompactionFilter ( ) )
? ikey_ . user_key
? ikey_ . user_key
: key_ ;
: key_ ;
compaction_filter_value_ . clear ( ) ;
compaction_filter_skip_until_ . Clear ( ) ;
{
{
StopWatchNano timer ( clock_ , report_detailed_time_ ) ;
StopWatchNano timer ( clock_ , report_detailed_time_ ) ;
if ( kTypeBlobIndex = = ikey_ . type ) {
filter = compaction_filter_ - > FilterBlobByKey (
if ( ikey_ . type = = kTypeBlobIndex ) {
decision = compaction_filter_ - > FilterBlobByKey (
level_ , filter_key , & compaction_filter_value_ ,
level_ , filter_key , & compaction_filter_value_ ,
compaction_filter_skip_until_ . rep ( ) ) ;
compaction_filter_skip_until_ . rep ( ) ) ;
if ( CompactionFilter : : Decision : : kUndetermined = = filter & &
if ( decision = = CompactionFilter : : Decision : : kUndetermined & &
! compaction_filter_ - > IsStackedBlobDbInternalCompactionFilter ( ) ) {
! compaction_filter_ - > IsStackedBlobDbInternalCompactionFilter ( ) ) {
if ( compaction_ = = nullptr ) {
if ( ! compaction_ ) {
status_ =
status_ =
Status : : Corruption ( " Unexpected blob index outside of compaction " ) ;
Status : : Corruption ( " Unexpected blob index outside of compaction " ) ;
validity_info_ . Invalidate ( ) ;
validity_info_ . Invalidate ( ) ;
@ -299,17 +303,18 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
value_type = CompactionFilter : : ValueType : : kValue ;
value_type = CompactionFilter : : ValueType : : kValue ;
}
}
}
}
if ( CompactionFilter : : Decision : : kUndetermined = = filter ) {
if ( decision = = CompactionFilter : : Decision : : kUndetermined ) {
filter = compaction_filter_ - > FilterV2 (
decision = compaction_filter_ - > FilterV2 (
level_ , filter_key , value_type ,
level_ , filter_key , value_type ,
blob_value_ . empty ( ) ? value_ : blob_value_ , & compaction_filter_value_ ,
blob_value_ . empty ( ) ? value_ : blob_value_ , & compaction_filter_value_ ,
compaction_filter_skip_until_ . rep ( ) ) ;
compaction_filter_skip_until_ . rep ( ) ) ;
}
}
iter_stats_ . total_filter_time + =
iter_stats_ . total_filter_time + =
env_ ! = nullptr & & report_detailed_time_ ? timer . ElapsedNanos ( ) : 0 ;
env_ ! = nullptr & & report_detailed_time_ ? timer . ElapsedNanos ( ) : 0 ;
}
}
if ( CompactionFilter : : Decision : : kUndetermined = = filter ) {
if ( decision = = CompactionFilter : : Decision : : kUndetermined ) {
// Should not reach here, since FilterV2 should never return kUndetermined.
// Should not reach here, since FilterV2 should never return kUndetermined.
status_ =
status_ =
Status : : NotSupported ( " FilterV2() should never return kUndetermined " ) ;
Status : : NotSupported ( " FilterV2() should never return kUndetermined " ) ;
@ -317,15 +322,15 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
return false ;
return false ;
}
}
if ( filter = = CompactionFilter : : Decision : : kRemoveAndSkipUntil & &
if ( decision = = CompactionFilter : : Decision : : kRemoveAndSkipUntil & &
cmp_ - > Compare ( * compaction_filter_skip_until_ . rep ( ) , ikey_ . user_key ) < =
cmp_ - > Compare ( * compaction_filter_skip_until_ . rep ( ) , ikey_ . user_key ) < =
0 ) {
0 ) {
// Can't skip to a key smaller than the current one.
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter : : Decision : : kKeep ;
decision = CompactionFilter : : Decision : : kKeep ;
}
}
if ( filter = = CompactionFilter : : Decision : : kRemove ) {
if ( decision = = CompactionFilter : : Decision : : kRemove ) {
// convert the current key to a delete; key_ is pointing into
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
// current_key_ at this point, so updating current_key_ updates key()
ikey_ . type = kTypeDeletion ;
ikey_ . type = kTypeDeletion ;
@ -333,7 +338,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
// no value associated with delete
// no value associated with delete
value_ . clear ( ) ;
value_ . clear ( ) ;
iter_stats_ . num_record_drop_user + + ;
iter_stats_ . num_record_drop_user + + ;
} else if ( filter = = CompactionFilter : : Decision : : kPurge ) {
} else if ( decision = = CompactionFilter : : Decision : : kPurge ) {
// convert the current key to a single delete; key_ is pointing into
// convert the current key to a single delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
// current_key_ at this point, so updating current_key_ updates key()
ikey_ . type = kTypeSingleDeletion ;
ikey_ . type = kTypeSingleDeletion ;
@ -341,19 +346,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
// no value associated with single delete
// no value associated with single delete
value_ . clear ( ) ;
value_ . clear ( ) ;
iter_stats_ . num_record_drop_user + + ;
iter_stats_ . num_record_drop_user + + ;
} else if ( filter = = CompactionFilter : : Decision : : kChangeValue ) {
} else if ( decision = = CompactionFilter : : Decision : : kChangeValue ) {
if ( ikey_ . type = = kTypeBlobIndex ) {
if ( ikey_ . type ! = kTypeValue ) {
// value transfer from blob file to inlined data
ikey_ . type = kTypeValue ;
ikey_ . type = kTypeValue ;
current_key_ . UpdateInternalKey ( ikey_ . sequence , ikey_ . typ e) ;
current_key_ . UpdateInternalKey ( ikey_ . sequence , kTypeValu e) ;
}
}
value_ = compaction_filter_value_ ;
value_ = compaction_filter_value_ ;
} else if ( filter = = CompactionFilter : : Decision : : kRemoveAndSkipUntil ) {
} else if ( decision = = CompactionFilter : : Decision : : kRemoveAndSkipUntil ) {
* need_skip = true ;
* need_skip = true ;
compaction_filter_skip_until_ . ConvertFromUserKey ( kMaxSequenceNumber ,
compaction_filter_skip_until_ . ConvertFromUserKey ( kMaxSequenceNumber ,
kValueTypeForSeek ) ;
kValueTypeForSeek ) ;
* skip_until = compaction_filter_skip_until_ . Encode ( ) ;
* skip_until = compaction_filter_skip_until_ . Encode ( ) ;
} else if ( filter = = CompactionFilter : : Decision : : kChangeBlobIndex ) {
} else if ( decision = = CompactionFilter : : Decision : : kChangeBlobIndex ) {
// Only the StackableDB-based BlobDB impl's compaction filter should return
// Only the StackableDB-based BlobDB impl's compaction filter should return
// kChangeBlobIndex. Decision about rewriting blob and changing blob index
// kChangeBlobIndex. Decision about rewriting blob and changing blob index
// in the integrated BlobDB impl is made in subsequent call to
// in the integrated BlobDB impl is made in subsequent call to
@ -365,23 +370,27 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
validity_info_ . Invalidate ( ) ;
validity_info_ . Invalidate ( ) ;
return false ;
return false ;
}
}
if ( ikey_ . type = = kTypeValue ) {
// value transfer from inlined data to blob file
if ( ikey_ . type ! = kTypeBlobIndex ) {
ikey_ . type = kTypeBlobIndex ;
ikey_ . type = kTypeBlobIndex ;
current_key_ . UpdateInternalKey ( ikey_ . sequence , ikey_ . type ) ;
current_key_ . UpdateInternalKey ( ikey_ . sequence , kTypeBlobIndex ) ;
}
}
value_ = compaction_filter_value_ ;
value_ = compaction_filter_value_ ;
} else if ( filter = = CompactionFilter : : Decision : : kIOError ) {
} else if ( decision = = CompactionFilter : : Decision : : kIOError ) {
if ( ! compaction_filter_ - > IsStackedBlobDbInternalCompactionFilter ( ) ) {
if ( ! compaction_filter_ - > IsStackedBlobDbInternalCompactionFilter ( ) ) {
status_ = Status : : NotSupported (
status_ = Status : : NotSupported (
" CompactionFilter for integrated BlobDB should not return kIOError " ) ;
" CompactionFilter for integrated BlobDB should not return kIOError " ) ;
validity_info_ . Invalidate ( ) ;
validity_info_ . Invalidate ( ) ;
return false ;
return false ;
}
}
status_ = Status : : IOError ( " Failed to access blob during compaction filter " ) ;
status_ = Status : : IOError ( " Failed to access blob during compaction filter " ) ;
error = true ;
validity_info_ . Invalidate ( ) ;
return false ;
}
}
return ! error ;
return true ;
}
}
void CompactionIterator : : NextFromInput ( ) {
void CompactionIterator : : NextFromInput ( ) {