diff --git a/lib/src/sparql/eval.rs b/lib/src/sparql/eval.rs index fd1bdecb..66d76d79 100644 --- a/lib/src/sparql/eval.rs +++ b/lib/src/sparql/eval.rs @@ -444,7 +444,39 @@ impl SimpleEvaluator { }) } } - PlanNode::LeftJoin { + PlanNode::HashLeftJoin { + left, + right, + expression, + } => { + let join_keys: Vec<_> = left + .always_bound_variables() + .intersection(&right.always_bound_variables()) + .copied() + .collect(); + let left = self.plan_evaluator(left); + let right = self.plan_evaluator(right); + let expression = self.expression_evaluator(expression); + // Real hash join + Rc::new(move |from| { + let mut errors = Vec::default(); + let mut right_values = EncodedTupleSet::new(join_keys.clone()); + right_values.extend(right(from.clone()).filter_map(|result| match result { + Ok(result) => Some(result), + Err(error) => { + errors.push(Err(error)); + None + } + })); + Box::new(HashLeftJoinIterator { + left_iter: left(from), + right: right_values, + buffered_results: errors, + expression: expression.clone(), + }) + }) + } + PlanNode::ForLoopLeftJoin { left, right, possible_problem_vars, @@ -454,7 +486,7 @@ impl SimpleEvaluator { let possible_problem_vars = possible_problem_vars.clone(); Rc::new(move |from| { if possible_problem_vars.is_empty() { - Box::new(LeftJoinIterator { + Box::new(ForLoopLeftJoinIterator { right_evaluator: right.clone(), left_iter: left(from), current_right: Box::new(empty()), @@ -3572,10 +3604,57 @@ impl Iterator for HashJoinIterator { Ok(left_tuple) => left_tuple, Err(error) => return Some(Err(error)), }; - for right_tuple in self.right.get(&left_tuple) { - if let Some(result_tuple) = left_tuple.combine_with(right_tuple) { - self.buffered_results.push(Ok(result_tuple)) - } + self.buffered_results.extend( + self.right + .get(&left_tuple) + .iter() + .filter_map(|right_tuple| left_tuple.combine_with(right_tuple).map(Ok)), + ) + } + } + + fn size_hint(&self) -> (usize, Option) { + ( + 0, + self.left_iter.size_hint().1.map(|v| v * self.right.len()), + ) + } +} + +struct HashLeftJoinIterator { + left_iter: EncodedTuplesIterator, + right: EncodedTupleSet, + buffered_results: Vec>, + expression: Rc Option>, +} + +impl Iterator for HashLeftJoinIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + loop { + if let Some(result) = self.buffered_results.pop() { + return Some(result); + } + let left_tuple = match self.left_iter.next()? { + Ok(left_tuple) => left_tuple, + Err(error) => return Some(Err(error)), + }; + self.buffered_results.extend( + self.right + .get(&left_tuple) + .iter() + .filter_map(|right_tuple| left_tuple.combine_with(right_tuple)) + .filter(|tuple| { + (self.expression)(tuple) + .and_then(|term| to_bool(&term)) + .unwrap_or(false) + }) + .map(Ok), + ); + if self.buffered_results.is_empty() { + // We have not manage to join with anything + return Some(Ok(left_tuple)); } } } @@ -3588,29 +3667,28 @@ impl Iterator for HashJoinIterator { } } -struct LeftJoinIterator { +struct ForLoopLeftJoinIterator { right_evaluator: Rc EncodedTuplesIterator>, left_iter: EncodedTuplesIterator, current_right: EncodedTuplesIterator, } -impl Iterator for LeftJoinIterator { +impl Iterator for ForLoopLeftJoinIterator { type Item = Result; fn next(&mut self) -> Option> { if let Some(tuple) = self.current_right.next() { return Some(tuple); } - match self.left_iter.next()? { - Ok(left_tuple) => { - self.current_right = (self.right_evaluator)(left_tuple.clone()); - if let Some(right_tuple) = self.current_right.next() { - Some(right_tuple) - } else { - Some(Ok(left_tuple)) - } - } - Err(error) => Some(Err(error)), + let left_tuple = match self.left_iter.next()? { + Ok(left_tuple) => left_tuple, + Err(error) => return Some(Err(error)), + }; + self.current_right = (self.right_evaluator)(left_tuple.clone()); + if let Some(right_tuple) = self.current_right.next() { + Some(right_tuple) + } else { + Some(Ok(left_tuple)) } } } diff --git a/lib/src/sparql/mod.rs b/lib/src/sparql/mod.rs index 1d7680f5..d12446dc 100644 --- a/lib/src/sparql/mod.rs +++ b/lib/src/sparql/mod.rs @@ -43,8 +43,13 @@ pub(crate) fn evaluate_query( spargebra::Query::Select { pattern, base_iri, .. } => { - let (plan, variables) = - PlanBuilder::build(&dataset, &pattern, true, &options.custom_functions)?; + let (plan, variables) = PlanBuilder::build( + &dataset, + &pattern, + true, + &options.custom_functions, + options.without_optimizations, + )?; Ok(SimpleEvaluator::new( Rc::new(dataset), base_iri.map(Rc::new), @@ -56,8 +61,13 @@ pub(crate) fn evaluate_query( spargebra::Query::Ask { pattern, base_iri, .. } => { - let (plan, _) = - PlanBuilder::build(&dataset, &pattern, false, &options.custom_functions)?; + let (plan, _) = PlanBuilder::build( + &dataset, + &pattern, + false, + &options.custom_functions, + options.without_optimizations, + )?; SimpleEvaluator::new( Rc::new(dataset), base_iri.map(Rc::new), @@ -72,13 +82,19 @@ pub(crate) fn evaluate_query( base_iri, .. } => { - let (plan, variables) = - PlanBuilder::build(&dataset, &pattern, false, &options.custom_functions)?; + let (plan, variables) = PlanBuilder::build( + &dataset, + &pattern, + false, + &options.custom_functions, + options.without_optimizations, + )?; let construct = PlanBuilder::build_graph_template( &dataset, &template, variables, &options.custom_functions, + options.without_optimizations, ); Ok(SimpleEvaluator::new( Rc::new(dataset), @@ -91,8 +107,13 @@ pub(crate) fn evaluate_query( spargebra::Query::Describe { pattern, base_iri, .. } => { - let (plan, _) = - PlanBuilder::build(&dataset, &pattern, false, &options.custom_functions)?; + let (plan, _) = PlanBuilder::build( + &dataset, + &pattern, + false, + &options.custom_functions, + options.without_optimizations, + )?; Ok(SimpleEvaluator::new( Rc::new(dataset), base_iri.map(Rc::new), @@ -128,6 +149,7 @@ pub struct QueryOptions { custom_functions: HashMap Option>>, http_timeout: Option, http_redirection_limit: usize, + without_optimizations: bool, } impl QueryOptions { @@ -213,6 +235,14 @@ impl QueryOptions { } }) } + + #[doc(hidden)] + #[inline] + #[must_use] + pub fn without_optimizations(mut self) -> Self { + self.without_optimizations = true; + self + } } /// Options for SPARQL update evaluation. diff --git a/lib/src/sparql/plan.rs b/lib/src/sparql/plan.rs index b5fd578a..696ae517 100644 --- a/lib/src/sparql/plan.rs +++ b/lib/src/sparql/plan.rs @@ -53,8 +53,14 @@ pub enum PlanNode { Union { children: Vec, }, + /// hash left join + HashLeftJoin { + left: Box, + right: Box, + expression: Box, + }, /// right nested in left loop - LeftJoin { + ForLoopLeftJoin { left: Box, right: Box, possible_problem_vars: Rc>, //Variables that should not be part of the entry of the left join @@ -163,9 +169,18 @@ impl PlanNode { Self::HashJoin { left, right } | Self::ForLoopJoin { left, right, .. } | Self::AntiJoin { left, right } - | Self::LeftJoin { left, right, .. } => { + | Self::ForLoopLeftJoin { left, right, .. } => { + left.lookup_used_variables(callback); + right.lookup_used_variables(callback); + } + Self::HashLeftJoin { + left, + right, + expression, + } => { left.lookup_used_variables(callback); right.lookup_used_variables(callback); + expression.lookup_used_variables(callback); } Self::Extend { child, @@ -304,7 +319,9 @@ impl PlanNode { left.lookup_always_bound_variables(callback); right.lookup_always_bound_variables(callback); } - Self::AntiJoin { left, .. } | Self::LeftJoin { left, .. } => { + Self::AntiJoin { left, .. } + | Self::HashLeftJoin { left, .. } + | Self::ForLoopLeftJoin { left, .. } => { left.lookup_always_bound_variables(callback); } Self::Extend { diff --git a/lib/src/sparql/plan_builder.rs b/lib/src/sparql/plan_builder.rs index a330e2e2..d3cfc198 100644 --- a/lib/src/sparql/plan_builder.rs +++ b/lib/src/sparql/plan_builder.rs @@ -15,6 +15,7 @@ use std::rc::Rc; pub struct PlanBuilder<'a> { dataset: &'a DatasetView, custom_functions: &'a HashMap Option>>, + with_optimizations: bool, } impl<'a> PlanBuilder<'a> { @@ -23,25 +24,27 @@ impl<'a> PlanBuilder<'a> { pattern: &GraphPattern, is_cardinality_meaningful: bool, custom_functions: &'a HashMap Option>>, + without_optimizations: bool, ) -> Result<(PlanNode, Vec), EvaluationError> { let mut variables = Vec::default(); let plan = PlanBuilder { dataset, custom_functions, + with_optimizations: !without_optimizations, } .build_for_graph_pattern( pattern, &mut variables, &PatternValue::Constant(EncodedTerm::DefaultGraph), )?; - let plan = if is_cardinality_meaningful { - plan - } else { + let plan = if !without_optimizations && !is_cardinality_meaningful { // let's reduce downstream task. // TODO: avoid if already REDUCED or DISTINCT PlanNode::Reduced { child: Box::new(plan), } + } else { + plan }; Ok((plan, variables)) } @@ -51,10 +54,12 @@ impl<'a> PlanBuilder<'a> { template: &[TriplePattern], mut variables: Vec, custom_functions: &'a HashMap Option>>, + without_optimizations: bool, ) -> Vec { PlanBuilder { dataset, custom_functions, + with_optimizations: !without_optimizations, } .build_for_graph_template(template, &mut variables) } @@ -66,19 +71,13 @@ impl<'a> PlanBuilder<'a> { graph_name: &PatternValue, ) -> Result { Ok(match pattern { - GraphPattern::Bgp { patterns } => sort_bgp(patterns) - .iter() - .map(|triple| PlanNode::QuadPattern { - subject: self.pattern_value_from_term_or_variable(&triple.subject, variables), - predicate: self - .pattern_value_from_named_node_or_variable(&triple.predicate, variables), - object: self.pattern_value_from_term_or_variable(&triple.object, variables), - graph_name: graph_name.clone(), - }) - .reduce(Self::new_join) - .unwrap_or_else(|| PlanNode::StaticBindings { - tuples: vec![EncodedTuple::with_capacity(variables.len())], - }), + GraphPattern::Bgp { patterns } => { + if self.with_optimizations { + self.build_for_bgp(sort_bgp(patterns), variables, graph_name) + } else { + self.build_for_bgp(patterns, variables, graph_name) + } + } GraphPattern::Path { subject, path, @@ -89,7 +88,7 @@ impl<'a> PlanBuilder<'a> { object: self.pattern_value_from_term_or_variable(object, variables), graph_name: graph_name.clone(), }, - GraphPattern::Join { left, right } => Self::new_join( + GraphPattern::Join { left, right } => self.new_join( self.build_for_graph_pattern(left, variables, graph_name)?, self.build_for_graph_pattern(right, variables, graph_name)?, ), @@ -103,24 +102,38 @@ impl<'a> PlanBuilder<'a> { let mut possible_problem_vars = BTreeSet::new(); Self::add_left_join_problematic_variables(&right, &mut possible_problem_vars); + if self.with_optimizations { + // TODO: don't use if SERVICE is inside of for loop - //We add the extra filter if needed - let right = if let Some(expr) = expression { - Self::push_filter( - Box::new(right), - Box::new(self.build_for_expression(expr, variables, graph_name)?), - ) + //We add the extra filter if needed + let right = if let Some(expr) = expression { + self.push_filter( + Box::new(right), + Box::new(self.build_for_expression(expr, variables, graph_name)?), + ) + } else { + right + }; + PlanNode::ForLoopLeftJoin { + left: Box::new(left), + right: Box::new(right), + possible_problem_vars: Rc::new(possible_problem_vars.into_iter().collect()), + } } else { - right - }; - - PlanNode::LeftJoin { - left: Box::new(left), - right: Box::new(right), - possible_problem_vars: Rc::new(possible_problem_vars.into_iter().collect()), + PlanNode::HashLeftJoin { + left: Box::new(left), + right: Box::new(right), + expression: Box::new( + expression + .as_ref() + .map_or(Ok(PlanExpression::Constant(true.into())), |e| { + self.build_for_expression(e, variables, graph_name) + })?, + ), + } } } - GraphPattern::Filter { expr, inner } => Self::push_filter( + GraphPattern::Filter { expr, inner } => self.push_filter( Box::new(self.build_for_graph_pattern(inner, variables, graph_name)?), Box::new(self.build_for_expression(expr, variables, graph_name)?), ), @@ -270,6 +283,27 @@ impl<'a> PlanBuilder<'a> { }) } + fn build_for_bgp<'b>( + &self, + patterns: impl IntoIterator, + variables: &mut Vec, + graph_name: &PatternValue, + ) -> PlanNode { + patterns + .into_iter() + .map(|triple| PlanNode::QuadPattern { + subject: self.pattern_value_from_term_or_variable(&triple.subject, variables), + predicate: self + .pattern_value_from_named_node_or_variable(&triple.predicate, variables), + object: self.pattern_value_from_term_or_variable(&triple.object, variables), + graph_name: graph_name.clone(), + }) + .reduce(|a, b| self.new_join(a, b)) + .unwrap_or_else(|| PlanNode::StaticBindings { + tuples: vec![EncodedTuple::with_capacity(variables.len())], + }) + } + fn build_for_path(&self, path: &PropertyPathExpression) -> PlanPropertyPath { match path { PropertyPathExpression::NamedNode(p) => PlanPropertyPath::Path(self.build_term(p)), @@ -1073,12 +1107,25 @@ impl<'a> PlanBuilder<'a> { PlanNode::AntiJoin { left, .. } => { Self::add_left_join_problematic_variables(left, set); } - PlanNode::LeftJoin { left, right, .. } => { + PlanNode::ForLoopLeftJoin { left, right, .. } => { Self::add_left_join_problematic_variables(left, set); right.lookup_used_variables(&mut |v| { set.insert(v); }); } + PlanNode::HashLeftJoin { + left, + right, + expression, + } => { + Self::add_left_join_problematic_variables(left, set); + right.lookup_used_variables(&mut |v| { + set.insert(v); + }); + expression.lookup_used_variables(&mut |v| { + set.insert(v); + }); + } PlanNode::Extend { child, expression, .. } => { @@ -1130,8 +1177,9 @@ impl<'a> PlanBuilder<'a> { } } - fn new_join(mut left: PlanNode, mut right: PlanNode) -> PlanNode { - if Self::is_fit_for_for_loop_join(&left) + fn new_join(&self, mut left: PlanNode, mut right: PlanNode) -> PlanNode { + if self.with_optimizations + && Self::is_fit_for_for_loop_join(&left) && Self::is_fit_for_for_loop_join(&right) && Self::has_some_common_variables(&left, &right) { @@ -1178,7 +1226,8 @@ impl<'a> PlanBuilder<'a> { } PlanNode::Union { children } => children.iter().all(Self::is_fit_for_for_loop_join), PlanNode::AntiJoin { .. } - | PlanNode::LeftJoin { .. } + | PlanNode::HashLeftJoin { .. } + | PlanNode::ForLoopLeftJoin { .. } | PlanNode::Service { .. } | PlanNode::Sort { .. } | PlanNode::HashDeduplicate { .. } @@ -1190,9 +1239,15 @@ impl<'a> PlanBuilder<'a> { } } - fn push_filter(node: Box, filter: Box) -> PlanNode { + fn push_filter(&self, node: Box, filter: Box) -> PlanNode { + if !self.with_optimizations { + return PlanNode::Filter { + child: node, + expression: filter, + }; + } if let PlanExpression::And(f1, f2) = *filter { - return Self::push_filter(Box::new(Self::push_filter(node, f1)), f2); + return self.push_filter(Box::new(self.push_filter(node, f1)), f2); } let mut filter_variables = BTreeSet::new(); filter.lookup_used_variables(&mut |v| { @@ -1203,19 +1258,19 @@ impl<'a> PlanBuilder<'a> { if filter_variables.iter().all(|v| left.is_variable_bound(*v)) { if filter_variables.iter().all(|v| right.is_variable_bound(*v)) { PlanNode::HashJoin { - left: Box::new(Self::push_filter(left, filter.clone())), - right: Box::new(Self::push_filter(right, filter)), + left: Box::new(self.push_filter(left, filter.clone())), + right: Box::new(self.push_filter(right, filter)), } } else { PlanNode::HashJoin { - left: Box::new(Self::push_filter(left, filter)), + left: Box::new(self.push_filter(left, filter)), right, } } } else if filter_variables.iter().all(|v| right.is_variable_bound(*v)) { PlanNode::HashJoin { left, - right: Box::new(Self::push_filter(right, filter)), + right: Box::new(self.push_filter(right, filter)), } } else { PlanNode::Filter { @@ -1227,14 +1282,14 @@ impl<'a> PlanBuilder<'a> { PlanNode::ForLoopJoin { left, right } => { if filter_variables.iter().all(|v| left.is_variable_bound(*v)) { PlanNode::ForLoopJoin { - left: Box::new(Self::push_filter(left, filter)), + left: Box::new(self.push_filter(left, filter)), right, } } else if filter_variables.iter().all(|v| right.is_variable_bound(*v)) { PlanNode::ForLoopJoin { //TODO: should we do that always? left, - right: Box::new(Self::push_filter(right, filter)), + right: Box::new(self.push_filter(right, filter)), } } else { PlanNode::Filter { @@ -1251,7 +1306,7 @@ impl<'a> PlanBuilder<'a> { //TODO: handle the case where the filter generates an expression variable if filter_variables.iter().all(|v| child.is_variable_bound(*v)) { PlanNode::Extend { - child: Box::new(Self::push_filter(child, filter)), + child: Box::new(self.push_filter(child, filter)), expression, position, } @@ -1269,7 +1324,7 @@ impl<'a> PlanBuilder<'a> { PlanNode::Filter { child, expression } => { if filter_variables.iter().all(|v| child.is_variable_bound(*v)) { PlanNode::Filter { - child: Box::new(Self::push_filter(child, filter)), + child: Box::new(self.push_filter(child, filter)), expression, } } else { @@ -1282,7 +1337,7 @@ impl<'a> PlanBuilder<'a> { PlanNode::Union { children } => PlanNode::Union { children: children .into_iter() - .map(|c| Self::push_filter(Box::new(c), filter.clone())) + .map(|c| self.push_filter(Box::new(c), filter.clone())) .collect(), }, node => PlanNode::Filter { diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index 6bd1e9e5..0cfe8888 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -123,6 +123,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> { algebra, false, &self.options.query_options.custom_functions, + !self.options.query_options.without_optimizations, )?; let evaluator = SimpleEvaluator::new( dataset.clone(), diff --git a/testsuite/src/sparql_evaluator.rs b/testsuite/src/sparql_evaluator.rs index 9a02df4d..acd81261 100644 --- a/testsuite/src/sparql_evaluator.rs +++ b/testsuite/src/sparql_evaluator.rs @@ -208,39 +208,50 @@ fn evaluate_evaluation_test(test: &Test) -> Result<()> { } } } - match store.query_opt(query, options) { - Err(error) => Err(anyhow!( - "Failure to execute query of {} with error: {}", - test, - error - )), - Ok(actual_results) => { - let expected_results = load_sparql_query_result(test.result.as_ref().unwrap()) + + for with_query_optimizer in [true, false] { + let mut options = options.clone(); + if !with_query_optimizer { + options = options.without_optimizations(); + } + match store.query_opt(query.clone(), options) { + Err(error) => { + return Err(anyhow!( + "Failure to execute query of {} with error: {}", + test, + error + )) + } + Ok(actual_results) => { + let expected_results = load_sparql_query_result( + test.result.as_ref().unwrap(), + ) .map_err(|e| { anyhow!("Error constructing expected graph for {}: {}", test, e) })?; - let with_order = - if let StaticQueryResults::Solutions { ordered, .. } = &expected_results { + let with_order = if let StaticQueryResults::Solutions { ordered, .. } = + &expected_results + { *ordered } else { false }; - let actual_results = - StaticQueryResults::from_query_results(actual_results, with_order)?; + let actual_results = + StaticQueryResults::from_query_results(actual_results, with_order)?; - if are_query_results_isomorphic(&expected_results, &actual_results) { - Ok(()) - } else { - Err(anyhow!("Failure on {}.\nExpected file:\n{}\nOutput file:\n{}\nParsed query:\n{}\nData:\n{}\n", + if !are_query_results_isomorphic(&expected_results, &actual_results) { + return Err(anyhow!("Failure on {}.\nExpected file:\n{}\nOutput file:\n{}\nParsed query:\n{}\nData:\n{}\n", test, expected_results, actual_results, Query::parse(&read_file_to_string(query_file)?, Some(query_file)).unwrap(), store - )) + )); + } } } } + Ok(()) } } }