Implements reduced operation during evaluation

Cheap streaming implementation (no malloc needed)
pull/171/head
Tpt 3 years ago
parent 47c4734d51
commit 505980f026
  1. 42
      lib/src/sparql/eval.rs
  2. 6
      lib/src/sparql/plan.rs
  3. 8
      lib/src/sparql/plan_builder.rs

@ -494,6 +494,15 @@ impl SimpleEvaluator {
let child = self.plan_evaluator(child); let child = self.plan_evaluator(child);
Rc::new(move |from| Box::new(hash_deduplicate(child(from)))) Rc::new(move |from| Box::new(hash_deduplicate(child(from))))
} }
PlanNode::Reduced { child } => {
let child = self.plan_evaluator(child);
Rc::new(move |from| {
Box::new(ConsecutiveDeduplication {
inner: child(from),
current: None,
})
})
}
PlanNode::Skip { child, count } => { PlanNode::Skip { child, count } => {
let child = self.plan_evaluator(child); let child = self.plan_evaluator(child);
let count = *count; let count = *count;
@ -3147,6 +3156,39 @@ impl Iterator for UnionIterator {
} }
} }
struct ConsecutiveDeduplication {
inner: EncodedTuplesIterator,
current: Option<EncodedTuple>,
}
impl Iterator for ConsecutiveDeduplication {
type Item = Result<EncodedTuple, EvaluationError>;
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> {
// Basic idea. We buffer the previous result and we only emit it when we kow the next one or it's the end
loop {
if let Some(next) = self.inner.next() {
match next {
Ok(next) => match self.current.take() {
Some(current) if current != next => {
// We found a relevant value
self.current = Some(next);
return Some(Ok(current));
}
_ => {
// We discard the value and move to the next one
self.current = Some(next);
}
},
Err(error) => return Some(Err(error)), // We swap but it's fine. It's an error.
}
} else {
return self.current.take().map(Ok);
}
}
}
}
struct ConstructIterator { struct ConstructIterator {
eval: SimpleEvaluator, eval: SimpleEvaluator,
iter: EncodedTuplesIterator, iter: EncodedTuplesIterator,

@ -70,6 +70,10 @@ pub enum PlanNode {
HashDeduplicate { HashDeduplicate {
child: Box<Self>, child: Box<Self>,
}, },
/// Removes duplicated consecutive elements
Reduced {
child: Box<Self>,
},
Skip { Skip {
child: Box<Self>, child: Box<Self>,
count: usize, count: usize,
@ -173,6 +177,7 @@ impl PlanNode {
} }
PlanNode::Sort { child, .. } PlanNode::Sort { child, .. }
| PlanNode::HashDeduplicate { child } | PlanNode::HashDeduplicate { child }
| PlanNode::Reduced { child }
| PlanNode::Skip { child, .. } | PlanNode::Skip { child, .. }
| PlanNode::Limit { child, .. } => child.lookup_used_variables(callback), | PlanNode::Limit { child, .. } => child.lookup_used_variables(callback),
PlanNode::Service { PlanNode::Service {
@ -314,6 +319,7 @@ impl PlanNode {
} }
PlanNode::Sort { child, .. } PlanNode::Sort { child, .. }
| PlanNode::HashDeduplicate { child } | PlanNode::HashDeduplicate { child }
| PlanNode::Reduced { child }
| PlanNode::Skip { child, .. } | PlanNode::Skip { child, .. }
| PlanNode::Limit { child, .. } => child.lookup_always_bound_variables(callback), | PlanNode::Limit { child, .. } => child.lookup_always_bound_variables(callback),
PlanNode::Service { child, silent, .. } => { PlanNode::Service { child, silent, .. } => {

@ -246,9 +246,9 @@ impl<'a> PlanBuilder<'a> {
GraphPattern::Distinct { inner } => PlanNode::HashDeduplicate { GraphPattern::Distinct { inner } => PlanNode::HashDeduplicate {
child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?), child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
}, },
GraphPattern::Reduced { inner } => { GraphPattern::Reduced { inner } => PlanNode::Reduced {
self.build_for_graph_pattern(inner, variables, graph_name)? child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
} },
GraphPattern::Slice { GraphPattern::Slice {
inner, inner,
start, start,
@ -1096,6 +1096,7 @@ impl<'a> PlanBuilder<'a> {
} }
PlanNode::Sort { child, .. } PlanNode::Sort { child, .. }
| PlanNode::HashDeduplicate { child } | PlanNode::HashDeduplicate { child }
| PlanNode::Reduced { child }
| PlanNode::Skip { child, .. } | PlanNode::Skip { child, .. }
| PlanNode::Limit { child, .. } => { | PlanNode::Limit { child, .. } => {
self.add_left_join_problematic_variables(&*child, set) self.add_left_join_problematic_variables(&*child, set)
@ -1186,6 +1187,7 @@ impl<'a> PlanBuilder<'a> {
| PlanNode::Service { .. } | PlanNode::Service { .. }
| PlanNode::Sort { .. } | PlanNode::Sort { .. }
| PlanNode::HashDeduplicate { .. } | PlanNode::HashDeduplicate { .. }
| PlanNode::Reduced { .. }
| PlanNode::Skip { .. } | PlanNode::Skip { .. }
| PlanNode::Limit { .. } | PlanNode::Limit { .. }
| PlanNode::Project { .. } | PlanNode::Project { .. }

Loading…
Cancel
Save