Makes nested PlanNode easy to clone

pull/470/head
Tpt 2 years ago committed by Thomas Tanon
parent 9dc1106b9a
commit 81793bc221
  1. 55
      lib/src/sparql/eval.rs
  2. 8
      lib/src/sparql/mod.rs
  3. 44
      lib/src/sparql/plan.rs
  4. 126
      lib/src/sparql/plan_builder.rs
  5. 5
      lib/src/sparql/update.rs

@ -60,14 +60,14 @@ impl SimpleEvaluator {
pub fn evaluate_select_plan(
&self,
plan: &PlanNode,
plan: Rc<PlanNode>,
variables: Rc<Vec<Variable>>,
) -> QueryResults {
let iter = self.plan_evaluator(plan)(EncodedTuple::with_capacity(variables.len()));
QueryResults::Solutions(decode_bindings(self.dataset.clone(), iter, variables))
}
pub fn evaluate_ask_plan(&self, plan: &PlanNode) -> Result<QueryResults, EvaluationError> {
pub fn evaluate_ask_plan(&self, plan: Rc<PlanNode>) -> Result<QueryResults, EvaluationError> {
let from = EncodedTuple::with_capacity(plan.used_variables().len());
match self.plan_evaluator(plan)(from).next() {
Some(Ok(_)) => Ok(QueryResults::Boolean(true)),
@ -78,7 +78,7 @@ impl SimpleEvaluator {
pub fn evaluate_construct_plan(
&self,
plan: &PlanNode,
plan: Rc<PlanNode>,
template: Vec<TripleTemplate>,
) -> QueryResults {
let from = EncodedTuple::with_capacity(plan.used_variables().len());
@ -93,7 +93,7 @@ impl SimpleEvaluator {
})
}
pub fn evaluate_describe_plan(&self, plan: &PlanNode) -> QueryResults {
pub fn evaluate_describe_plan(&self, plan: Rc<PlanNode>) -> QueryResults {
let from = EncodedTuple::with_capacity(plan.used_variables().len());
QueryResults::Graph(QueryTripleIter {
iter: Box::new(DescribeIterator {
@ -106,9 +106,9 @@ impl SimpleEvaluator {
pub fn plan_evaluator(
&self,
node: &PlanNode,
node: Rc<PlanNode>,
) -> Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator> {
match node {
match node.as_ref() {
PlanNode::StaticBindings { encoded_tuples, .. } => {
let tuples = encoded_tuples.clone();
Rc::new(move |from| {
@ -348,8 +348,8 @@ impl SimpleEvaluator {
.intersection(&right.always_bound_variables())
.copied()
.collect();
let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right);
let left = self.plan_evaluator(left.clone());
let right = self.plan_evaluator(right.clone());
if join_keys.is_empty() {
// Cartesian product
Rc::new(move |from| {
@ -392,8 +392,8 @@ impl SimpleEvaluator {
}
}
PlanNode::ForLoopJoin { left, right } => {
let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right);
let left = self.plan_evaluator(left.clone());
let right = self.plan_evaluator(right.clone());
Rc::new(move |from| {
let right = right.clone();
Box::new(left(from).flat_map(move |t| match t {
@ -408,8 +408,8 @@ impl SimpleEvaluator {
.intersection(&right.always_bound_variables())
.copied()
.collect();
let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right);
let left = self.plan_evaluator(left.clone());
let right = self.plan_evaluator(right.clone());
if join_keys.is_empty() {
Rc::new(move |from| {
let right: Vec<_> = right(from.clone()).filter_map(Result::ok).collect();
@ -449,8 +449,8 @@ impl SimpleEvaluator {
.intersection(&right.always_bound_variables())
.copied()
.collect();
let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right);
let left = self.plan_evaluator(left.clone());
let right = self.plan_evaluator(right.clone());
let expression = self.expression_evaluator(expression);
// Real hash join
Rc::new(move |from| {
@ -476,8 +476,8 @@ impl SimpleEvaluator {
right,
possible_problem_vars,
} => {
let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right);
let left = self.plan_evaluator(left.clone());
let right = self.plan_evaluator(right.clone());
let possible_problem_vars = possible_problem_vars.clone();
Rc::new(move |from| {
if possible_problem_vars.is_empty() {
@ -499,7 +499,7 @@ impl SimpleEvaluator {
})
}
PlanNode::Filter { child, expression } => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
let expression = self.expression_evaluator(expression);
Rc::new(move |from| {
let expression = expression.clone();
@ -516,7 +516,7 @@ impl SimpleEvaluator {
PlanNode::Union { children } => {
let children: Vec<_> = children
.iter()
.map(|child| self.plan_evaluator(child))
.map(|child| self.plan_evaluator(child.clone()))
.collect();
Rc::new(move |from| {
Box::new(UnionIterator {
@ -532,7 +532,7 @@ impl SimpleEvaluator {
variable,
expression,
} => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
let position = variable.encoded;
let expression = self.expression_evaluator(expression);
Rc::new(move |from| {
@ -547,7 +547,7 @@ impl SimpleEvaluator {
})
}
PlanNode::Sort { child, by } => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
let by: Vec<_> = by
.iter()
.map(|comp| match comp {
@ -604,11 +604,11 @@ impl SimpleEvaluator {
})
}
PlanNode::HashDeduplicate { child } => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
Rc::new(move |from| Box::new(hash_deduplicate(child(from))))
}
PlanNode::Reduced { child } => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
Rc::new(move |from| {
Box::new(ConsecutiveDeduplication {
inner: child(from),
@ -617,17 +617,17 @@ impl SimpleEvaluator {
})
}
PlanNode::Skip { child, count } => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
let count = *count;
Rc::new(move |from| Box::new(child(from).skip(count)))
}
PlanNode::Limit { child, count } => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
let count = *count;
Rc::new(move |from| Box::new(child(from).take(count)))
}
PlanNode::Project { child, mapping } => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
let mapping = mapping.clone();
Rc::new(move |from| {
let mapping = mapping.clone();
@ -666,7 +666,7 @@ impl SimpleEvaluator {
key_variables,
aggregates,
} => {
let child = self.plan_evaluator(child);
let child = self.plan_evaluator(child.clone());
let key_variables = key_variables.clone();
let aggregate_input_expressions: Vec<_> = aggregates
.iter()
@ -864,8 +864,7 @@ impl SimpleEvaluator {
Rc::new(move |tuple| tuple.get(v).cloned())
}
PlanExpression::Exists(plan) => {
let plan = plan.clone();
let eval = self.plan_evaluator(&plan);
let eval = self.plan_evaluator(plan.clone());
Rc::new(move |tuple| Some(eval(tuple.clone()).next().is_some().into()))
}
PlanExpression::Or(a, b) => {

@ -56,7 +56,7 @@ pub(crate) fn evaluate_query(
options.service_handler(),
Rc::new(options.custom_functions),
)
.evaluate_select_plan(&plan, Rc::new(variables)))
.evaluate_select_plan(Rc::new(plan), Rc::new(variables)))
}
spargebra::Query::Ask {
pattern, base_iri, ..
@ -74,7 +74,7 @@ pub(crate) fn evaluate_query(
options.service_handler(),
Rc::new(options.custom_functions),
)
.evaluate_ask_plan(&plan)
.evaluate_ask_plan(Rc::new(plan))
}
spargebra::Query::Construct {
template,
@ -102,7 +102,7 @@ pub(crate) fn evaluate_query(
options.service_handler(),
Rc::new(options.custom_functions),
)
.evaluate_construct_plan(&plan, construct))
.evaluate_construct_plan(Rc::new(plan), construct))
}
spargebra::Query::Describe {
pattern, base_iri, ..
@ -120,7 +120,7 @@ pub(crate) fn evaluate_query(
options.service_handler(),
Rc::new(options.custom_functions),
)
.evaluate_describe_plan(&plan))
.evaluate_describe_plan(Rc::new(plan)))
}
}
}

@ -9,7 +9,7 @@ use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum PlanNode {
StaticBindings {
encoded_tuples: Vec<EncodedTuple>,
@ -19,7 +19,7 @@ pub enum PlanNode {
Service {
service_name: PatternValue,
variables: Rc<Vec<Variable>>,
child: Box<Self>,
child: Rc<Self>,
graph_pattern: Rc<GraphPattern>,
silent: bool,
},
@ -37,69 +37,69 @@ pub enum PlanNode {
},
/// Streams left and materializes right join
HashJoin {
left: Box<Self>,
right: Box<Self>,
left: Rc<Self>,
right: Rc<Self>,
},
/// Right nested in left loop
ForLoopJoin {
left: Box<Self>,
right: Box<Self>,
left: Rc<Self>,
right: Rc<Self>,
},
/// Streams left and materializes right anti join
AntiJoin {
left: Box<Self>,
right: Box<Self>,
left: Rc<Self>,
right: Rc<Self>,
},
Filter {
child: Box<Self>,
child: Rc<Self>,
expression: Box<PlanExpression>,
},
Union {
children: Vec<Self>,
children: Vec<Rc<Self>>,
},
/// hash left join
HashLeftJoin {
left: Box<Self>,
right: Box<Self>,
left: Rc<Self>,
right: Rc<Self>,
expression: Box<PlanExpression>,
},
/// right nested in left loop
ForLoopLeftJoin {
left: Box<Self>,
right: Box<Self>,
left: Rc<Self>,
right: Rc<Self>,
possible_problem_vars: Rc<Vec<usize>>, //Variables that should not be part of the entry of the left join
},
Extend {
child: Box<Self>,
child: Rc<Self>,
variable: PlanVariable,
expression: Box<PlanExpression>,
},
Sort {
child: Box<Self>,
child: Rc<Self>,
by: Vec<Comparator>,
},
HashDeduplicate {
child: Box<Self>,
child: Rc<Self>,
},
/// Removes duplicated consecutive elements
Reduced {
child: Box<Self>,
child: Rc<Self>,
},
Skip {
child: Box<Self>,
child: Rc<Self>,
count: usize,
},
Limit {
child: Box<Self>,
child: Rc<Self>,
count: usize,
},
Project {
child: Box<Self>,
child: Rc<Self>,
mapping: Rc<Vec<(PlanVariable, PlanVariable)>>, // pairs of (variable key in child, variable key in output)
},
Aggregate {
// By definition the group by key are the range 0..key_mapping.len()
child: Box<Self>,
child: Rc<Self>,
key_variables: Rc<Vec<PlanVariable>>,
aggregates: Rc<Vec<(PlanAggregation, PlanVariable)>>,
},

@ -46,7 +46,7 @@ impl<'a> PlanBuilder<'a> {
// let's reduce downstream task.
// TODO: avoid if already REDUCED or DISTINCT
PlanNode::Reduced {
child: Box::new(plan),
child: Rc::new(plan),
}
} else {
plan
@ -113,21 +113,21 @@ impl<'a> PlanBuilder<'a> {
//We add the extra filter if needed
let right = if let Some(expr) = expression {
self.push_filter(
Box::new(right),
Rc::new(right),
Box::new(self.build_for_expression(expr, variables, graph_name)?),
)
} else {
right
};
PlanNode::ForLoopLeftJoin {
left: Box::new(left),
right: Box::new(right),
left: Rc::new(left),
right: Rc::new(right),
possible_problem_vars: Rc::new(possible_problem_vars.into_iter().collect()),
}
} else {
PlanNode::HashLeftJoin {
left: Box::new(left),
right: Box::new(right),
left: Rc::new(left),
right: Rc::new(right),
expression: Box::new(expression.as_ref().map_or(
Ok(PlanExpression::Literal(PlanTerm {
encoded: true.into(),
@ -139,11 +139,11 @@ impl<'a> PlanBuilder<'a> {
}
}
GraphPattern::Lateral { left, right } => PlanNode::ForLoopJoin {
left: Box::new(self.build_for_graph_pattern(left, variables, graph_name)?),
right: Box::new(self.build_for_graph_pattern(right, variables, graph_name)?),
left: Rc::new(self.build_for_graph_pattern(left, variables, graph_name)?),
right: Rc::new(self.build_for_graph_pattern(right, variables, graph_name)?),
},
GraphPattern::Filter { expr, inner } => self.push_filter(
Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
Rc::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
Box::new(self.build_for_expression(expr, variables, graph_name)?),
),
GraphPattern::Union { left, right } => {
@ -157,9 +157,9 @@ impl<'a> PlanBuilder<'a> {
stack.push(left);
stack.push(right);
}
Some(p) => {
children.push(self.build_for_graph_pattern(p, variables, graph_name)?)
}
Some(p) => children.push(Rc::new(
self.build_for_graph_pattern(p, variables, graph_name)?,
)),
}
}
PlanNode::Union { children }
@ -173,13 +173,13 @@ impl<'a> PlanBuilder<'a> {
variable,
expression,
} => PlanNode::Extend {
child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
child: Rc::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
variable: build_plan_variable(variables, variable),
expression: Box::new(self.build_for_expression(expression, variables, graph_name)?),
},
GraphPattern::Minus { left, right } => PlanNode::AntiJoin {
left: Box::new(self.build_for_graph_pattern(left, variables, graph_name)?),
right: Box::new(self.build_for_graph_pattern(right, variables, graph_name)?),
left: Rc::new(self.build_for_graph_pattern(left, variables, graph_name)?),
right: Rc::new(self.build_for_graph_pattern(right, variables, graph_name)?),
},
GraphPattern::Service {
name,
@ -192,7 +192,7 @@ impl<'a> PlanBuilder<'a> {
PlanNode::Service {
service_name,
variables: Rc::new(variables.clone()),
child: Box::new(child),
child: Rc::new(child),
graph_pattern: Rc::new(inner.as_ref().clone()),
silent: *silent,
}
@ -202,7 +202,7 @@ impl<'a> PlanBuilder<'a> {
variables: by,
aggregates,
} => PlanNode::Aggregate {
child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
child: Rc::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
key_variables: Rc::new(
by.iter()
.map(|k| build_plan_variable(variables, k))
@ -266,7 +266,7 @@ impl<'a> PlanBuilder<'a> {
})
.collect();
PlanNode::Sort {
child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
child: Rc::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
by: condition?,
}
}
@ -278,7 +278,7 @@ impl<'a> PlanBuilder<'a> {
let inner_graph_name =
Self::convert_pattern_value_id(graph_name, &mut inner_variables);
PlanNode::Project {
child: Box::new(self.build_for_graph_pattern(
child: Rc::new(self.build_for_graph_pattern(
inner,
&mut inner_variables,
&inner_graph_name,
@ -301,10 +301,10 @@ impl<'a> PlanBuilder<'a> {
}
}
GraphPattern::Distinct { inner } => PlanNode::HashDeduplicate {
child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
child: Rc::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
},
GraphPattern::Reduced { inner } => PlanNode::Reduced {
child: Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
child: Rc::new(self.build_for_graph_pattern(inner, variables, graph_name)?),
},
GraphPattern::Slice {
inner,
@ -314,13 +314,13 @@ impl<'a> PlanBuilder<'a> {
let mut plan = self.build_for_graph_pattern(inner, variables, graph_name)?;
if *start > 0 {
plan = PlanNode::Skip {
child: Box::new(plan),
child: Rc::new(plan),
count: *start,
};
}
if let Some(length) = length {
plan = PlanNode::Limit {
child: Box::new(plan),
child: Rc::new(plan),
count: *length,
};
}
@ -1351,8 +1351,8 @@ impl<'a> PlanBuilder<'a> {
swap(&mut left, &mut right);
}
PlanNode::ForLoopJoin {
left: Box::new(left),
right: Box::new(right),
left: Rc::new(left),
right: Rc::new(right),
}
} else {
// Let's avoid materializing right if left is already materialized
@ -1361,8 +1361,8 @@ impl<'a> PlanBuilder<'a> {
swap(&mut left, &mut right);
}
PlanNode::HashJoin {
left: Box::new(left),
right: Box::new(right),
left: Rc::new(left),
right: Rc::new(right),
}
}
}
@ -1387,7 +1387,9 @@ impl<'a> PlanBuilder<'a> {
PlanNode::Filter { child, .. } | PlanNode::Extend { child, .. } => {
Self::is_fit_for_for_loop_join(child)
}
PlanNode::Union { children } => children.iter().all(Self::is_fit_for_for_loop_join),
PlanNode::Union { children } => {
children.iter().all(|c| Self::is_fit_for_for_loop_join(c))
}
PlanNode::AntiJoin { .. }
| PlanNode::HashLeftJoin { .. }
| PlanNode::ForLoopLeftJoin { .. }
@ -1402,7 +1404,7 @@ impl<'a> PlanBuilder<'a> {
}
}
fn push_filter(&self, node: Box<PlanNode>, filter: Box<PlanExpression>) -> PlanNode {
fn push_filter(&self, node: Rc<PlanNode>, filter: Box<PlanExpression>) -> PlanNode {
if !self.with_optimizations {
return PlanNode::Filter {
child: node,
@ -1410,34 +1412,37 @@ impl<'a> PlanBuilder<'a> {
};
}
if let PlanExpression::And(f1, f2) = *filter {
return self.push_filter(Box::new(self.push_filter(node, f1)), f2);
return self.push_filter(Rc::new(self.push_filter(node, f1)), f2);
}
let mut filter_variables = BTreeSet::new();
filter.lookup_used_variables(&mut |v| {
filter_variables.insert(v);
});
match *node {
match node.as_ref() {
PlanNode::HashJoin { left, right } => {
if filter_variables.iter().all(|v| left.is_variable_bound(*v)) {
if filter_variables.iter().all(|v| right.is_variable_bound(*v)) {
PlanNode::HashJoin {
left: Box::new(self.push_filter(left, filter.clone())),
right: Box::new(self.push_filter(right, filter)),
left: Rc::new(self.push_filter(left.clone(), filter.clone())),
right: Rc::new(self.push_filter(right.clone(), filter)),
}
} else {
PlanNode::HashJoin {
left: Box::new(self.push_filter(left, filter)),
right,
left: Rc::new(self.push_filter(left.clone(), filter)),
right: right.clone(),
}
}
} else if filter_variables.iter().all(|v| right.is_variable_bound(*v)) {
PlanNode::HashJoin {
left,
right: Box::new(self.push_filter(right, filter)),
left: left.clone(),
right: Rc::new(self.push_filter(right.clone(), filter)),
}
} else {
PlanNode::Filter {
child: Box::new(PlanNode::HashJoin { left, right }),
child: Rc::new(PlanNode::HashJoin {
left: left.clone(),
right: right.clone(),
}),
expression: filter,
}
}
@ -1445,18 +1450,21 @@ impl<'a> PlanBuilder<'a> {
PlanNode::ForLoopJoin { left, right } => {
if filter_variables.iter().all(|v| left.is_variable_bound(*v)) {
PlanNode::ForLoopJoin {
left: Box::new(self.push_filter(left, filter)),
right,
left: Rc::new(self.push_filter(left.clone(), filter)),
right: right.clone(),
}
} else if filter_variables.iter().all(|v| right.is_variable_bound(*v)) {
PlanNode::ForLoopJoin {
//TODO: should we do that always?
left,
right: Box::new(self.push_filter(right, filter)),
left: left.clone(),
right: Rc::new(self.push_filter(right.clone(), filter)),
}
} else {
PlanNode::Filter {
child: Box::new(PlanNode::HashJoin { left, right }),
child: Rc::new(PlanNode::HashJoin {
left: left.clone(),
right: right.clone(),
}),
expression: filter,
}
}
@ -1464,21 +1472,21 @@ impl<'a> PlanBuilder<'a> {
PlanNode::Extend {
child,
expression,
variable: position,
variable,
} => {
//TODO: handle the case where the filter generates an expression variable
if filter_variables.iter().all(|v| child.is_variable_bound(*v)) {
PlanNode::Extend {
child: Box::new(self.push_filter(child, filter)),
expression,
variable: position,
child: Rc::new(self.push_filter(child.clone(), filter)),
expression: expression.clone(),
variable: variable.clone(),
}
} else {
PlanNode::Filter {
child: Box::new(PlanNode::Extend {
child,
expression,
variable: position,
child: Rc::new(PlanNode::Extend {
child: child.clone(),
expression: expression.clone(),
variable: variable.clone(),
}),
expression: filter,
}
@ -1487,25 +1495,25 @@ impl<'a> PlanBuilder<'a> {
PlanNode::Filter { child, expression } => {
if filter_variables.iter().all(|v| child.is_variable_bound(*v)) {
PlanNode::Filter {
child: Box::new(self.push_filter(child, filter)),
expression,
child: Rc::new(self.push_filter(child.clone(), filter)),
expression: expression.clone(),
}
} else {
PlanNode::Filter {
child,
expression: Box::new(PlanExpression::And(expression, filter)),
child: child.clone(),
expression: Box::new(PlanExpression::And(expression.clone(), filter)),
}
}
}
PlanNode::Union { children } => PlanNode::Union {
children: children
.into_iter()
.map(|c| self.push_filter(Box::new(c), filter.clone()))
.iter()
.map(|c| Rc::new(self.push_filter(c.clone(), filter.clone())))
.collect(),
},
node => PlanNode::Filter {
_ => PlanNode::Filter {
//TODO: more?
child: Box::new(node),
child: node,
expression: filter,
},
}

@ -132,8 +132,9 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> {
Rc::new(self.options.query_options.custom_functions.clone()),
);
let mut bnodes = HashMap::new();
let tuples = evaluator.plan_evaluator(&plan)(EncodedTuple::with_capacity(variables.len()))
.collect::<Result<Vec<_>, _>>()?; //TODO: would be much better to stream
let tuples =
evaluator.plan_evaluator(Rc::new(plan))(EncodedTuple::with_capacity(variables.len()))
.collect::<Result<Vec<_>, _>>()?; //TODO: would be much better to stream
for tuple in tuples {
for quad in delete {
if let Some(quad) =

Loading…
Cancel
Save