@ -441,11 +441,9 @@ RangeDelAggregator::RangeDelAggregator(const InternalKeyComparator& icmp,
void RangeDelAggregator : : InitRep ( const std : : vector < SequenceNumber > & snapshots ) {
void RangeDelAggregator : : InitRep ( const std : : vector < SequenceNumber > & snapshots ) {
assert ( rep_ = = nullptr ) ;
assert ( rep_ = = nullptr ) ;
rep_ . reset ( new Rep ( ) ) ;
rep_ . reset ( new Rep ( ) ) ;
for ( auto snapshot : snapshots ) {
rep_ - > snapshots_ = snapshots ;
rep_ - > stripe_map_ . emplace ( snapshot , NewRangeDelMap ( ) ) ;
}
// Data newer than any snapshot falls in this catch-all stripe
// Data newer than any snapshot falls in this catch-all stripe
rep_ - > stripe_map _ . emplace ( kMaxSequenceNumber , NewRangeDelMap ( ) ) ;
rep_ - > snapshots_ . emplace_back ( kMaxSequenceNumber ) ;
rep_ - > pinned_iters_mgr_ . StartPinning ( ) ;
rep_ - > pinned_iters_mgr_ . StartPinning ( ) ;
}
}
@ -474,11 +472,11 @@ bool RangeDelAggregator::ShouldDeleteImpl(const ParsedInternalKey& parsed,
RangeDelPositioningMode mode ) {
RangeDelPositioningMode mode ) {
assert ( IsValueType ( parsed . type ) ) ;
assert ( IsValueType ( parsed . type ) ) ;
assert ( rep_ ! = nullptr ) ;
assert ( rep_ ! = nullptr ) ;
auto & tombstone_map = GetRangeDelMap ( parsed . sequence ) ;
auto * tombstone_map = GetRangeDelMapIfExists ( parsed . sequence ) ;
if ( tombstone_map . IsEmpty ( ) ) {
if ( tombstone_map = = nullptr | | tombstone_map - > IsEmpty ( ) ) {
return false ;
return false ;
}
}
return tombstone_map . ShouldDelete ( parsed , mode ) ;
return tombstone_map - > ShouldDelete ( parsed , mode ) ;
}
}
bool RangeDelAggregator : : IsRangeOverlapped ( const Slice & start ,
bool RangeDelAggregator : : IsRangeOverlapped ( const Slice & start ,
@ -492,7 +490,7 @@ bool RangeDelAggregator::IsRangeOverlapped(const Slice& start,
ParsedInternalKey start_ikey ( start , kMaxSequenceNumber , kMaxValue ) ;
ParsedInternalKey start_ikey ( start , kMaxSequenceNumber , kMaxValue ) ;
ParsedInternalKey end_ikey ( end , 0 , static_cast < ValueType > ( 0 ) ) ;
ParsedInternalKey end_ikey ( end , 0 , static_cast < ValueType > ( 0 ) ) ;
for ( const auto & stripe : rep_ - > stripe_map_ ) {
for ( const auto & stripe : rep_ - > stripe_map_ ) {
if ( stripe . second - > IsRangeOverlapped ( start_ikey , end_ikey ) ) {
if ( stripe . second . first - > IsRangeOverlapped ( start_ikey , end_ikey ) ) {
return true ;
return true ;
}
}
}
}
@ -587,24 +585,42 @@ void RangeDelAggregator::InvalidateRangeDelMapPositions() {
return ;
return ;
}
}
for ( auto & stripe : rep_ - > stripe_map_ ) {
for ( auto & stripe : rep_ - > stripe_map_ ) {
stripe . second - > InvalidatePosition ( ) ;
stripe . second . first - > InvalidatePosition ( ) ;
}
}
}
}
RangeDelMap & RangeDelAggregator : : GetRangeDelMap ( SequenceNumber seq ) {
RangeDelMap * RangeDelAggregator : : GetRangeDelMapIfExists ( SequenceNumber seq ) {
assert ( rep_ ! = nullptr ) ;
assert ( rep_ ! = nullptr ) ;
// The stripe includes seqnum for the snapshot above and excludes seqnum for
// The stripe includes seqnum for the snapshot above and excludes seqnum for
// the snapshot below.
// the snapshot below.
StripeMap : : iterator iter ;
if ( rep_ - > stripe_map_ . empty ( ) ) {
if ( seq > 0 ) {
return nullptr ;
// upper_bound() checks strict inequality so need to subtract one
iter = rep_ - > stripe_map_ . upper_bound ( seq - 1 ) ;
} else {
iter = rep_ - > stripe_map_ . begin ( ) ;
}
}
StripeMap : : iterator iter = rep_ - > stripe_map_ . lower_bound ( seq ) ;
if ( iter = = rep_ - > stripe_map_ . end ( ) ) {
return nullptr ;
}
size_t snapshot_idx = iter - > second . second ;
if ( snapshot_idx > 0 & & seq < = rep_ - > snapshots_ [ snapshot_idx - 1 ] ) {
return nullptr ;
}
return iter - > second . first . get ( ) ;
}
RangeDelMap & RangeDelAggregator : : GetRangeDelMap ( SequenceNumber seq ) {
assert ( rep_ ! = nullptr ) ;
// The stripe includes seqnum for the snapshot above and excludes seqnum for
// the snapshot below.
std : : vector < SequenceNumber > : : iterator iter =
std : : lower_bound ( rep_ - > snapshots_ . begin ( ) , rep_ - > snapshots_ . end ( ) , seq ) ;
// catch-all stripe justifies this assertion in either of above cases
// catch-all stripe justifies this assertion in either of above cases
assert ( iter ! = rep_ - > stripe_map_ . end ( ) ) ;
assert ( iter ! = rep_ - > snapshots_ . end ( ) ) ;
return * iter - > second ;
if ( rep_ - > stripe_map_ . find ( * iter ) = = rep_ - > stripe_map_ . end ( ) ) {
rep_ - > stripe_map_ . emplace (
* iter ,
std : : make_pair ( NewRangeDelMap ( ) , iter - rep_ - > snapshots_ . begin ( ) ) ) ;
}
return * rep_ - > stripe_map_ [ * iter ] . first ;
}
}
bool RangeDelAggregator : : IsEmpty ( ) {
bool RangeDelAggregator : : IsEmpty ( ) {
@ -612,7 +628,7 @@ bool RangeDelAggregator::IsEmpty() {
return true ;
return true ;
}
}
for ( const auto & stripe : rep_ - > stripe_map_ ) {
for ( const auto & stripe : rep_ - > stripe_map_ ) {
if ( ! stripe . second - > IsEmpty ( ) ) {
if ( ! stripe . second . first - > IsEmpty ( ) ) {
return false ;
return false ;
}
}
}
}
@ -696,7 +712,7 @@ std::unique_ptr<RangeDelIterator> RangeDelAggregator::NewIterator() {
new MergingRangeDelIter ( icmp_ . user_comparator ( ) ) ) ;
new MergingRangeDelIter ( icmp_ . user_comparator ( ) ) ) ;
if ( rep_ ! = nullptr ) {
if ( rep_ ! = nullptr ) {
for ( const auto & stripe : rep_ - > stripe_map_ ) {
for ( const auto & stripe : rep_ - > stripe_map_ ) {
iter - > AddIterator ( stripe . second - > NewIterator ( ) ) ;
iter - > AddIterator ( stripe . second . first - > NewIterator ( ) ) ;
}
}
}
}
return std : : move ( iter ) ;
return std : : move ( iter ) ;