@ -290,59 +290,69 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// (almost) silently dropping the put/delete. That's probably not what we
// want. Also if we're in compaction and it's a put, it would be nice to
// run compaction filter on it.
const Slice val = iter - > value ( ) ;
PinnableSlice blob_value ;
const Slice * val_ptr ;
if ( ( kTypeValue = = ikey . type | | kTypeBlobIndex = = ikey . type | |
kTypeWideColumnEntity = = ikey . type ) & &
( range_del_agg = = nullptr | |
! range_del_agg - > ShouldDelete (
ikey , RangeDelPositioningMode : : kForwardTraversal ) ) ) {
if ( ikey . type = = kTypeWideColumnEntity ) {
// TODO: support wide-column entities
return Status : : NotSupported (
" Merge currently not supported for wide-column entities " ) ;
} else if ( ikey . type = = kTypeBlobIndex ) {
BlobIndex blob_index ;
s = blob_index . DecodeFrom ( val ) ;
if ( ! s . ok ( ) ) {
return s ;
}
FilePrefetchBuffer * prefetch_buffer =
prefetch_buffers ? prefetch_buffers - > GetOrCreatePrefetchBuffer (
blob_index . file_number ( ) )
: nullptr ;
uint64_t bytes_read = 0 ;
assert ( blob_fetcher ) ;
s = blob_fetcher - > FetchBlob ( ikey . user_key , blob_index ,
prefetch_buffer , & blob_value ,
& bytes_read ) ;
if ( ! s . ok ( ) ) {
return s ;
}
val_ptr = & blob_value ;
if ( c_iter_stats ) {
+ + c_iter_stats - > num_blobs_read ;
c_iter_stats - > total_blob_bytes_read + = bytes_read ;
}
} else {
val_ptr = & val ;
std : : string merge_result ;
if ( range_del_agg & &
range_del_agg - > ShouldDelete (
ikey , RangeDelPositioningMode : : kForwardTraversal ) ) {
s = TimedFullMerge ( user_merge_operator_ , ikey . user_key , nullptr ,
merge_context_ . GetOperands ( ) , & merge_result , logger_ ,
stats_ , clock_ ,
/* result_operand */ nullptr ,
/* update_num_ops_stats */ false ) ;
} else if ( ikey . type = = kTypeValue ) {
const Slice val = iter - > value ( ) ;
s = TimedFullMerge ( user_merge_operator_ , ikey . user_key , & val ,
merge_context_ . GetOperands ( ) , & merge_result , logger_ ,
stats_ , clock_ ,
/* result_operand */ nullptr ,
/* update_num_ops_stats */ false ) ;
} else if ( ikey . type = = kTypeBlobIndex ) {
BlobIndex blob_index ;
s = blob_index . DecodeFrom ( iter - > value ( ) ) ;
if ( ! s . ok ( ) ) {
return s ;
}
FilePrefetchBuffer * prefetch_buffer =
prefetch_buffers ? prefetch_buffers - > GetOrCreatePrefetchBuffer (
blob_index . file_number ( ) )
: nullptr ;
uint64_t bytes_read = 0 ;
assert ( blob_fetcher ) ;
PinnableSlice blob_value ;
s = blob_fetcher - > FetchBlob ( ikey . user_key , blob_index , prefetch_buffer ,
& blob_value , & bytes_read ) ;
if ( ! s . ok ( ) ) {
return s ;
}
if ( c_iter_stats ) {
+ + c_iter_stats - > num_blobs_read ;
c_iter_stats - > total_blob_bytes_read + = bytes_read ;
}
s = TimedFullMerge ( user_merge_operator_ , ikey . user_key , & blob_value ,
merge_context_ . GetOperands ( ) , & merge_result , logger_ ,
stats_ , clock_ ,
/* result_operand */ nullptr ,
/* update_num_ops_stats */ false ) ;
} else if ( ikey . type = = kTypeWideColumnEntity ) {
// TODO: support wide-column entities
return Status : : NotSupported (
" Merge currently not supported for wide-column entities " ) ;
} else {
val_ptr = nullptr ;
s = TimedFullMerge ( user_merge_operator_ , ikey . user_key , nullptr ,
merge_context_ . GetOperands ( ) , & merge_result , logger_ ,
stats_ , clock_ ,
/* result_operand */ nullptr ,
/* update_num_ops_stats */ false ) ;
}
std : : string merge_result ;
s = TimedFullMerge (
user_merge_operator_ , ikey . user_key , val_ptr ,
merge_context_ . GetOperands ( ) , & merge_result , logger_ , stats_ , clock_ ,
/* result_operand */ nullptr , /* update_num_ops_stats */ false ) ;
// We store the result in keys_.back() and operands_.back()
// if nothing went wrong (i.e.: no operand corruption on disk)