diff --git a/lib/src/sparql/eval.rs b/lib/src/sparql/eval.rs index f79b660c..e4ba980e 100644 --- a/lib/src/sparql/eval.rs +++ b/lib/src/sparql/eval.rs @@ -271,26 +271,60 @@ impl SimpleEvaluator { }) } PlanNode::HashJoin { left, right } => { + let join_keys: Vec<_> = left + .always_bound_variables() + .intersection(&right.always_bound_variables()) + .copied() + .collect(); let left = self.plan_evaluator(left); let right = self.plan_evaluator(right); - Rc::new(move |from| { - //TODO: very dumb implementation - let mut errors = Vec::default(); - let left_values = left(from.clone()) - .filter_map(|result| match result { - Ok(result) => Some(result), - Err(error) => { - errors.push(Err(error)); - None + if join_keys.is_empty() { + // Cartesian product + Rc::new(move |from| { + let mut errors = Vec::default(); + let right_values = right(from.clone()) + .filter_map(|result| match result { + Ok(result) => Some(result), + Err(error) => { + errors.push(Err(error)); + None + } + }) + .collect::>(); + Box::new(CartesianProductJoinIterator { + left_iter: left(from), + right: right_values, + buffered_results: errors, + }) + }) + } else { + // Real hash join + Rc::new(move |from| { + let mut errors = Vec::default(); + let mut right_values = HashMap::new(); + for result in right(from.clone()) { + match result { + Ok(result) => { + let key = + join_keys.iter().map(|k| result.get(*k).cloned()).collect(); + right_values + .entry(key) + .or_insert_with(Vec::new) + .push(result); + } + Err(error) => { + errors.push(Err(error)); + } } + } + Box::new(HashJoinIterator { + left_iter: left(from), + right: right_values, + buffered_results: errors, + join_keys: join_keys.clone(), }) - .collect::>(); - Box::new(JoinIterator { - left: left_values, - right_iter: right(from), - buffered_results: errors, }) - }) + } } PlanNode::ForLoopJoin { left, right } => { let left = self.plan_evaluator(left); @@ -304,18 +338,40 @@ impl SimpleEvaluator { }) } PlanNode::AntiJoin { left, right } => { + let join_keys: Vec<_> = left + .always_bound_variables() + .intersection(&right.always_bound_variables()) + .copied() + .collect(); let left = self.plan_evaluator(left); let right = self.plan_evaluator(right); - Rc::new(move |from| { - //TODO: dumb implementation - let right: Vec<_> = right(from.clone()) - .filter_map(std::result::Result::ok) - .collect(); - Box::new(AntiJoinIterator { - left_iter: left(from), - right, + if join_keys.is_empty() { + Rc::new(move |from| { + let right: Vec<_> = right(from.clone()) + .filter_map(std::result::Result::ok) + .collect(); + Box::new(CartesianProductAntiJoinIterator { + left_iter: left(from), + right, + }) }) - }) + } else { + Rc::new(move |from| { + let mut right_values = HashMap::new(); + for result in right(from.clone()).filter_map(std::result::Result::ok) { + let key = join_keys.iter().map(|k| result.get(*k).cloned()).collect(); + right_values + .entry(key) + .or_insert_with(Vec::new) + .push(result); + } + Box::new(HashAntiJoinIterator { + left_iter: left(from), + right: right_values, + join_keys: join_keys.clone(), + }) + }) + } } PlanNode::LeftJoin { left, @@ -2898,13 +2954,13 @@ pub fn are_compatible_and_not_disjointed(a: &EncodedTuple, b: &EncodedTuple) -> found_intersection } -struct JoinIterator { - left: Vec, - right_iter: EncodedTuplesIterator, +struct CartesianProductJoinIterator { + left_iter: EncodedTuplesIterator, + right: Vec, buffered_results: Vec>, } -impl Iterator for JoinIterator { +impl Iterator for CartesianProductJoinIterator { type Item = Result; fn next(&mut self) -> Option> { @@ -2912,12 +2968,12 @@ impl Iterator for JoinIterator { if let Some(result) = self.buffered_results.pop() { return Some(result); } - let right_tuple = match self.right_iter.next()? { - Ok(right_tuple) => right_tuple, + let left_tuple = match self.left_iter.next()? { + Ok(left_tuple) => left_tuple, Err(error) => return Some(Err(error)), }; - for left_tuple in &self.left { - if let Some(result_tuple) = left_tuple.combine_with(&right_tuple) { + for right_tuple in &self.right { + if let Some(result_tuple) = left_tuple.combine_with(right_tuple) { self.buffered_results.push(Ok(result_tuple)) } } @@ -2925,12 +2981,47 @@ impl Iterator for JoinIterator { } } -struct AntiJoinIterator { +struct HashJoinIterator { + left_iter: EncodedTuplesIterator, + right: HashMap>, Vec>, + buffered_results: Vec>, + join_keys: Vec, +} + +impl Iterator for HashJoinIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + loop { + if let Some(result) = self.buffered_results.pop() { + return Some(result); + } + let left_tuple = match self.left_iter.next()? { + Ok(left_tuple) => left_tuple, + Err(error) => return Some(Err(error)), + }; + let key: Vec<_> = self + .join_keys + .iter() + .map(|k| left_tuple.get(*k).cloned()) + .collect(); + if let Some(right_tuples) = self.right.get(&key) { + for right_tuple in right_tuples { + if let Some(result_tuple) = left_tuple.combine_with(right_tuple) { + self.buffered_results.push(Ok(result_tuple)) + } + } + } + } + } +} + +struct CartesianProductAntiJoinIterator { left_iter: EncodedTuplesIterator, right: Vec, } -impl Iterator for AntiJoinIterator { +impl Iterator for CartesianProductAntiJoinIterator { type Item = Result; fn next(&mut self) -> Option> { @@ -2950,6 +3041,39 @@ impl Iterator for AntiJoinIterator { } } +struct HashAntiJoinIterator { + left_iter: EncodedTuplesIterator, + right: HashMap>, Vec>, + join_keys: Vec, +} + +impl Iterator for HashAntiJoinIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + loop { + match self.left_iter.next()? { + Ok(left_tuple) => { + let key: Vec<_> = self + .join_keys + .iter() + .map(|k| left_tuple.get(*k).cloned()) + .collect(); + let exists_compatible_right = self.right.get(&key).map_or(false, |r| { + r.iter().any(|right_tuple| { + are_compatible_and_not_disjointed(&left_tuple, right_tuple) + }) + }); + if !exists_compatible_right { + return Some(Ok(left_tuple)); + } + } + Err(error) => return Some(Err(error)), + } + } + } +} + struct LeftJoinIterator { right_evaluator: Rc EncodedTuplesIterator>, left_iter: EncodedTuplesIterator, diff --git a/lib/src/sparql/plan.rs b/lib/src/sparql/plan.rs index d1d161d9..af5755a0 100644 --- a/lib/src/sparql/plan.rs +++ b/lib/src/sparql/plan.rs @@ -30,14 +30,17 @@ pub enum PlanNode { object: PatternValue, graph_name: PatternValue, }, + /// Streams left and materializes right join HashJoin { left: Rc, right: Rc, }, + /// Right nested in left loop ForLoopJoin { left: Rc, right: Rc, }, + /// Streams left and materializes right anti join AntiJoin { left: Rc, right: Rc, @@ -49,6 +52,7 @@ pub enum PlanNode { Union { children: Vec>, }, + /// right nested in left loop LeftJoin { left: Rc, right: Rc,