Introduces PlanNode::ForLoopJoin

Makes for loop join more explicit and usable in more places
pull/171/head
Tpt 3 years ago
parent 1ddc9a8788
commit 4f7e396af0
  1. 68
      lib/src/sparql/eval.rs
  2. 27
      lib/src/sparql/plan.rs
  3. 104
      lib/src/sparql/plan_builder.rs

@ -105,7 +105,6 @@ impl SimpleEvaluator {
node: &PlanNode, node: &PlanNode,
) -> Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator> { ) -> Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator> {
match node { match node {
PlanNode::Init => Rc::new(move |from| Box::new(once(Ok(from)))),
PlanNode::StaticBindings { tuples } => { PlanNode::StaticBindings { tuples } => {
let tuples = tuples.clone(); let tuples = tuples.clone();
Rc::new(move |from| { Rc::new(move |from| {
@ -152,80 +151,57 @@ impl SimpleEvaluator {
} }
}) })
} }
PlanNode::QuadPatternJoin { PlanNode::QuadPattern {
child,
subject, subject,
predicate, predicate,
object, object,
graph_name, graph_name,
} => { } => {
let child = self.plan_evaluator(child);
let subject = subject.clone(); let subject = subject.clone();
let predicate = predicate.clone(); let predicate = predicate.clone();
let object = object.clone(); let object = object.clone();
let graph_name = graph_name.clone(); let graph_name = graph_name.clone();
let dataset = self.dataset.clone(); let dataset = self.dataset.clone();
Rc::new(move |from| { Rc::new(move |from| {
let subject = subject.clone();
let predicate = predicate.clone();
let object = object.clone();
let graph_name = graph_name.clone();
let dataset = dataset.clone();
Box::new(child(from).flat_map_ok(move |tuple| {
let iter = dataset.encoded_quads_for_pattern( let iter = dataset.encoded_quads_for_pattern(
get_pattern_value(&subject, &tuple).as_ref(), get_pattern_value(&subject, &from).as_ref(),
get_pattern_value(&predicate, &tuple).as_ref(), get_pattern_value(&predicate, &from).as_ref(),
get_pattern_value(&object, &tuple).as_ref(), get_pattern_value(&object, &from).as_ref(),
get_pattern_value(&graph_name, &tuple).as_ref(), get_pattern_value(&graph_name, &from).as_ref(),
); );
let subject = subject.clone(); let subject = subject.clone();
let predicate = predicate.clone(); let predicate = predicate.clone();
let object = object.clone(); let object = object.clone();
let graph_name = graph_name.clone(); let graph_name = graph_name.clone();
let iter: EncodedTuplesIterator =
Box::new(iter.filter_map(move |quad| match quad { Box::new(iter.filter_map(move |quad| match quad {
Ok(quad) => { Ok(quad) => {
let mut new_tuple = tuple.clone(); let mut new_tuple = from.clone();
put_pattern_value(&subject, quad.subject, &mut new_tuple)?; put_pattern_value(&subject, quad.subject, &mut new_tuple)?;
put_pattern_value(&predicate, quad.predicate, &mut new_tuple)?; put_pattern_value(&predicate, quad.predicate, &mut new_tuple)?;
put_pattern_value(&object, quad.object, &mut new_tuple)?; put_pattern_value(&object, quad.object, &mut new_tuple)?;
put_pattern_value( put_pattern_value(&graph_name, quad.graph_name, &mut new_tuple)?;
&graph_name,
quad.graph_name,
&mut new_tuple,
)?;
Some(Ok(new_tuple)) Some(Ok(new_tuple))
} }
Err(error) => Some(Err(error)), Err(error) => Some(Err(error)),
}));
iter
})) }))
}) })
} }
PlanNode::PathPatternJoin { PlanNode::PathPattern {
child,
subject, subject,
path, path,
object, object,
graph_name, graph_name,
} => { } => {
let child = self.plan_evaluator(child);
let eval = self.clone(); let eval = self.clone();
let subject = subject.clone(); let subject = subject.clone();
let path = path.clone(); let path = path.clone();
let object = object.clone(); let object = object.clone();
let graph_name = graph_name.clone(); let graph_name = graph_name.clone();
Rc::new(move |from| { Rc::new(move |from| {
let eval = eval.clone(); let input_subject = get_pattern_value(&subject, &from);
let subject = subject.clone(); let input_object = get_pattern_value(&object, &from);
let path = path.clone();
let object = object.clone();
let graph_name = graph_name.clone();
Box::new(child(from).flat_map_ok(move |tuple| {
let input_subject = get_pattern_value(&subject, &tuple);
let input_object = get_pattern_value(&object, &tuple);
let input_graph_name = let input_graph_name =
if let Some(graph_name) = get_pattern_value(&graph_name, &tuple) { if let Some(graph_name) = get_pattern_value(&graph_name, &from) {
graph_name graph_name
} else { } else {
let result: EncodedTuplesIterator = let result: EncodedTuplesIterator =
@ -240,7 +216,7 @@ impl SimpleEvaluator {
.filter_map(move |o| match o { .filter_map(move |o| match o {
Ok(o) => { Ok(o) => {
if o == input_object { if o == input_object {
Some(Ok(tuple.clone())) Some(Ok(from.clone()))
} else { } else {
None None
} }
@ -254,7 +230,7 @@ impl SimpleEvaluator {
eval.eval_path_from(&path, &input_subject, &input_graph_name) eval.eval_path_from(&path, &input_subject, &input_graph_name)
.filter_map(move |o| match o { .filter_map(move |o| match o {
Ok(o) => { Ok(o) => {
let mut new_tuple = tuple.clone(); let mut new_tuple = from.clone();
put_pattern_value(&object, o, &mut new_tuple)?; put_pattern_value(&object, o, &mut new_tuple)?;
Some(Ok(new_tuple)) Some(Ok(new_tuple))
} }
@ -268,7 +244,7 @@ impl SimpleEvaluator {
eval.eval_path_to(&path, &input_object, &input_graph_name) eval.eval_path_to(&path, &input_object, &input_graph_name)
.filter_map(move |s| match s { .filter_map(move |s| match s {
Ok(s) => { Ok(s) => {
let mut new_tuple = tuple.clone(); let mut new_tuple = from.clone();
put_pattern_value(&subject, s, &mut new_tuple)?; put_pattern_value(&subject, s, &mut new_tuple)?;
Some(Ok(new_tuple)) Some(Ok(new_tuple))
} }
@ -282,7 +258,7 @@ impl SimpleEvaluator {
Box::new(eval.eval_open_path(&path, &input_graph_name).filter_map( Box::new(eval.eval_open_path(&path, &input_graph_name).filter_map(
move |so| match so { move |so| match so {
Ok((s, o)) => { Ok((s, o)) => {
let mut new_tuple = tuple.clone(); let mut new_tuple = from.clone();
put_pattern_value(&subject, s, &mut new_tuple)?; put_pattern_value(&subject, s, &mut new_tuple)?;
put_pattern_value(&object, o, &mut new_tuple)?; put_pattern_value(&object, o, &mut new_tuple)?;
Some(Ok(new_tuple)) Some(Ok(new_tuple))
@ -292,10 +268,9 @@ impl SimpleEvaluator {
)) ))
} }
} }
}))
}) })
} }
PlanNode::Join { left, right } => { PlanNode::HashJoin { left, right } => {
let left = self.plan_evaluator(left); let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right); let right = self.plan_evaluator(right);
Rc::new(move |from| { Rc::new(move |from| {
@ -317,6 +292,17 @@ impl SimpleEvaluator {
}) })
}) })
} }
PlanNode::ForLoopJoin { left, right } => {
let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right);
Rc::new(move |from| {
let right = right.clone();
Box::new(left(from).flat_map(move |t| match t {
Ok(t) => right(t),
Err(e) => Box::new(once(Err(e))),
}))
})
}
PlanNode::AntiJoin { left, right } => { PlanNode::AntiJoin { left, right } => {
let left = self.plan_evaluator(left); let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right); let right = self.plan_evaluator(right);

@ -6,7 +6,6 @@ use std::rc::Rc;
#[derive(Eq, PartialEq, Debug, Clone, Hash)] #[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum PlanNode { pub enum PlanNode {
Init,
StaticBindings { StaticBindings {
tuples: Vec<EncodedTuple>, tuples: Vec<EncodedTuple>,
}, },
@ -17,21 +16,23 @@ pub enum PlanNode {
graph_pattern: Rc<GraphPattern>, graph_pattern: Rc<GraphPattern>,
silent: bool, silent: bool,
}, },
QuadPatternJoin { QuadPattern {
child: Rc<Self>,
subject: PatternValue, subject: PatternValue,
predicate: PatternValue, predicate: PatternValue,
object: PatternValue, object: PatternValue,
graph_name: PatternValue, graph_name: PatternValue,
}, },
PathPatternJoin { PathPattern {
child: Rc<Self>,
subject: PatternValue, subject: PatternValue,
path: Rc<PlanPropertyPath>, path: Rc<PlanPropertyPath>,
object: PatternValue, object: PatternValue,
graph_name: PatternValue, graph_name: PatternValue,
}, },
Join { HashJoin {
left: Rc<Self>,
right: Rc<Self>,
},
ForLoopJoin {
left: Rc<Self>, left: Rc<Self>,
right: Rc<Self>, right: Rc<Self>,
}, },
@ -93,7 +94,6 @@ impl PlanNode {
pub fn add_maybe_bound_variables(&self, set: &mut BTreeSet<usize>) { pub fn add_maybe_bound_variables(&self, set: &mut BTreeSet<usize>) {
match self { match self {
PlanNode::Init => (),
PlanNode::StaticBindings { tuples } => { PlanNode::StaticBindings { tuples } => {
for tuple in tuples { for tuple in tuples {
for (key, value) in tuple.iter().enumerate() { for (key, value) in tuple.iter().enumerate() {
@ -103,8 +103,7 @@ impl PlanNode {
} }
} }
} }
PlanNode::QuadPatternJoin { PlanNode::QuadPattern {
child,
subject, subject,
predicate, predicate,
object, object,
@ -122,10 +121,8 @@ impl PlanNode {
if let PatternValue::Variable(var) = graph_name { if let PatternValue::Variable(var) = graph_name {
set.insert(*var); set.insert(*var);
} }
child.add_maybe_bound_variables(set);
} }
PlanNode::PathPatternJoin { PlanNode::PathPattern {
child,
subject, subject,
object, object,
graph_name, graph_name,
@ -140,7 +137,6 @@ impl PlanNode {
if let PatternValue::Variable(var) = graph_name { if let PatternValue::Variable(var) = graph_name {
set.insert(*var); set.insert(*var);
} }
child.add_maybe_bound_variables(set);
} }
PlanNode::Filter { child, expression } => { PlanNode::Filter { child, expression } => {
expression.add_maybe_bound_variables(set); expression.add_maybe_bound_variables(set);
@ -151,8 +147,9 @@ impl PlanNode {
child.add_maybe_bound_variables(set); child.add_maybe_bound_variables(set);
} }
} }
PlanNode::Join { left, right, .. } PlanNode::HashJoin { left, right }
| PlanNode::AntiJoin { left, right, .. } | PlanNode::ForLoopJoin { left, right, .. }
| PlanNode::AntiJoin { left, right }
| PlanNode::LeftJoin { left, right, .. } => { | PlanNode::LeftJoin { left, right, .. } => {
left.add_maybe_bound_variables(set); left.add_maybe_bound_variables(set);
right.add_maybe_bound_variables(set); right.add_maybe_bound_variables(set);

@ -43,12 +43,47 @@ impl<'a> PlanBuilder<'a> {
graph_name: &PatternValue, graph_name: &PatternValue,
) -> Result<PlanNode, EvaluationError> { ) -> Result<PlanNode, EvaluationError> {
Ok(match pattern { Ok(match pattern {
GraphPattern::Bgp { .. } GraphPattern::Bgp { patterns } => sort_bgp(patterns)
| GraphPattern::Path { .. } .iter()
| GraphPattern::Sequence { .. } => { .map(|triple| PlanNode::QuadPattern {
self.build_sequence(PlanNode::Init, pattern, variables, graph_name)? subject: self.pattern_value_from_term_or_variable(&triple.subject, variables),
} predicate: self
GraphPattern::Join { left, right } => PlanNode::Join { .pattern_value_from_named_node_or_variable(&triple.predicate, variables),
object: self.pattern_value_from_term_or_variable(&triple.object, variables),
graph_name: graph_name.clone(),
})
.reduce(|left, right| PlanNode::ForLoopJoin {
left: Rc::new(left),
right: Rc::new(right),
})
.unwrap_or_else(|| PlanNode::StaticBindings {
tuples: vec![EncodedTuple::with_capacity(variables.len())],
}),
GraphPattern::Path {
subject,
path,
object,
} => PlanNode::PathPattern {
subject: self.pattern_value_from_term_or_variable(subject, variables),
path: Rc::new(self.build_for_path(path)),
object: self.pattern_value_from_term_or_variable(object, variables),
graph_name: graph_name.clone(),
},
GraphPattern::Sequence(elements) => elements
.iter()
.map(|e| self.build_for_graph_pattern(e, variables, graph_name))
.reduce(|left, right| {
Ok(PlanNode::ForLoopJoin {
left: Rc::new(left?),
right: Rc::new(right?),
})
})
.unwrap_or_else(|| {
Ok(PlanNode::StaticBindings {
tuples: vec![EncodedTuple::with_capacity(variables.len())],
})
})?,
GraphPattern::Join { left, right } => PlanNode::HashJoin {
left: Rc::new(self.build_for_graph_pattern(left, variables, graph_name)?), left: Rc::new(self.build_for_graph_pattern(left, variables, graph_name)?),
right: Rc::new(self.build_for_graph_pattern(right, variables, graph_name)?), right: Rc::new(self.build_for_graph_pattern(right, variables, graph_name)?),
}, },
@ -255,54 +290,6 @@ impl<'a> PlanBuilder<'a> {
}) })
} }
fn build_sequence(
&mut self,
mut plan: PlanNode,
pattern: &GraphPattern,
variables: &mut Vec<Variable>,
graph_name: &PatternValue,
) -> Result<PlanNode, EvaluationError> {
match pattern {
GraphPattern::Bgp { patterns } => {
for triple in sort_bgp(patterns) {
plan = PlanNode::QuadPatternJoin {
child: Rc::new(plan),
subject: self
.pattern_value_from_term_or_variable(&triple.subject, variables),
predicate: self.pattern_value_from_named_node_or_variable(
&triple.predicate,
variables,
),
object: self.pattern_value_from_term_or_variable(&triple.object, variables),
graph_name: graph_name.clone(),
}
}
Ok(plan)
}
GraphPattern::Path {
subject,
path,
object,
} => Ok(PlanNode::PathPatternJoin {
child: Rc::new(plan),
subject: self.pattern_value_from_term_or_variable(subject, variables),
path: Rc::new(self.build_for_path(path)),
object: self.pattern_value_from_term_or_variable(object, variables),
graph_name: graph_name.clone(),
}),
GraphPattern::Graph { inner, name } => {
let graph_name = self.pattern_value_from_named_node_or_variable(name, variables);
self.build_sequence(plan, inner, variables, &graph_name)
}
GraphPattern::Sequence(elements) => elements.iter().fold(Ok(plan), |plan, element| {
self.build_sequence(plan?, element, variables, graph_name)
}),
_ => Err(EvaluationError::msg(
"Unexpected element in a sequence: {:?}.",
)),
}
}
fn build_for_path(&mut self, path: &PropertyPathExpression) -> PlanPropertyPath { fn build_for_path(&mut self, path: &PropertyPathExpression) -> PlanPropertyPath {
match path { match path {
PropertyPathExpression::NamedNode(p) => { PropertyPathExpression::NamedNode(p) => {
@ -1073,10 +1060,9 @@ impl<'a> PlanBuilder<'a> {
fn add_left_join_problematic_variables(&self, node: &PlanNode, set: &mut BTreeSet<usize>) { fn add_left_join_problematic_variables(&self, node: &PlanNode, set: &mut BTreeSet<usize>) {
match node { match node {
PlanNode::Init PlanNode::StaticBindings { .. }
| PlanNode::StaticBindings { .. } | PlanNode::QuadPattern { .. }
| PlanNode::QuadPatternJoin { .. } | PlanNode::PathPattern { .. } => (),
| PlanNode::PathPatternJoin { .. } => (),
PlanNode::Filter { child, expression } => { PlanNode::Filter { child, expression } => {
expression.add_maybe_bound_variables(set); //TODO: only if it is not already bound expression.add_maybe_bound_variables(set); //TODO: only if it is not already bound
self.add_left_join_problematic_variables(&*child, set); self.add_left_join_problematic_variables(&*child, set);
@ -1086,7 +1072,7 @@ impl<'a> PlanBuilder<'a> {
self.add_left_join_problematic_variables(child, set); self.add_left_join_problematic_variables(child, set);
} }
} }
PlanNode::Join { left, right, .. } => { PlanNode::HashJoin { left, right } | PlanNode::ForLoopJoin { left, right } => {
self.add_left_join_problematic_variables(&*left, set); self.add_left_join_problematic_variables(&*left, set);
self.add_left_join_problematic_variables(&*right, set); self.add_left_join_problematic_variables(&*right, set);
} }

Loading…
Cancel
Save