Implements real hash join and anti join

pull/171/head
Tpt 3 years ago
parent ba98e2d1f8
commit 6287b4e4c9
  1. 158
      lib/src/sparql/eval.rs
  2. 4
      lib/src/sparql/plan.rs

@ -271,12 +271,18 @@ impl SimpleEvaluator {
}) })
} }
PlanNode::HashJoin { left, right } => { PlanNode::HashJoin { left, right } => {
let join_keys: Vec<_> = left
.always_bound_variables()
.intersection(&right.always_bound_variables())
.copied()
.collect();
let left = self.plan_evaluator(left); let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right); let right = self.plan_evaluator(right);
if join_keys.is_empty() {
// Cartesian product
Rc::new(move |from| { Rc::new(move |from| {
//TODO: very dumb implementation
let mut errors = Vec::default(); let mut errors = Vec::default();
let left_values = left(from.clone()) let right_values = right(from.clone())
.filter_map(|result| match result { .filter_map(|result| match result {
Ok(result) => Some(result), Ok(result) => Some(result),
Err(error) => { Err(error) => {
@ -285,13 +291,41 @@ impl SimpleEvaluator {
} }
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Box::new(JoinIterator { Box::new(CartesianProductJoinIterator {
left: left_values, left_iter: left(from),
right_iter: right(from), right: right_values,
buffered_results: errors,
})
})
} else {
// Real hash join
Rc::new(move |from| {
let mut errors = Vec::default();
let mut right_values = HashMap::new();
for result in right(from.clone()) {
match result {
Ok(result) => {
let key =
join_keys.iter().map(|k| result.get(*k).cloned()).collect();
right_values
.entry(key)
.or_insert_with(Vec::new)
.push(result);
}
Err(error) => {
errors.push(Err(error));
}
}
}
Box::new(HashJoinIterator {
left_iter: left(from),
right: right_values,
buffered_results: errors, buffered_results: errors,
join_keys: join_keys.clone(),
}) })
}) })
} }
}
PlanNode::ForLoopJoin { left, right } => { PlanNode::ForLoopJoin { left, right } => {
let left = self.plan_evaluator(left); let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right); let right = self.plan_evaluator(right);
@ -304,18 +338,40 @@ impl SimpleEvaluator {
}) })
} }
PlanNode::AntiJoin { left, right } => { PlanNode::AntiJoin { left, right } => {
let join_keys: Vec<_> = left
.always_bound_variables()
.intersection(&right.always_bound_variables())
.copied()
.collect();
let left = self.plan_evaluator(left); let left = self.plan_evaluator(left);
let right = self.plan_evaluator(right); let right = self.plan_evaluator(right);
if join_keys.is_empty() {
Rc::new(move |from| { Rc::new(move |from| {
//TODO: dumb implementation
let right: Vec<_> = right(from.clone()) let right: Vec<_> = right(from.clone())
.filter_map(std::result::Result::ok) .filter_map(std::result::Result::ok)
.collect(); .collect();
Box::new(AntiJoinIterator { Box::new(CartesianProductAntiJoinIterator {
left_iter: left(from), left_iter: left(from),
right, right,
}) })
}) })
} else {
Rc::new(move |from| {
let mut right_values = HashMap::new();
for result in right(from.clone()).filter_map(std::result::Result::ok) {
let key = join_keys.iter().map(|k| result.get(*k).cloned()).collect();
right_values
.entry(key)
.or_insert_with(Vec::new)
.push(result);
}
Box::new(HashAntiJoinIterator {
left_iter: left(from),
right: right_values,
join_keys: join_keys.clone(),
})
})
}
} }
PlanNode::LeftJoin { PlanNode::LeftJoin {
left, left,
@ -2898,13 +2954,41 @@ pub fn are_compatible_and_not_disjointed(a: &EncodedTuple, b: &EncodedTuple) ->
found_intersection found_intersection
} }
struct JoinIterator { struct CartesianProductJoinIterator {
left: Vec<EncodedTuple>, left_iter: EncodedTuplesIterator,
right_iter: EncodedTuplesIterator, right: Vec<EncodedTuple>,
buffered_results: Vec<Result<EncodedTuple, EvaluationError>>,
}
impl Iterator for CartesianProductJoinIterator {
type Item = Result<EncodedTuple, EvaluationError>;
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> {
loop {
if let Some(result) = self.buffered_results.pop() {
return Some(result);
}
let left_tuple = match self.left_iter.next()? {
Ok(left_tuple) => left_tuple,
Err(error) => return Some(Err(error)),
};
for right_tuple in &self.right {
if let Some(result_tuple) = left_tuple.combine_with(right_tuple) {
self.buffered_results.push(Ok(result_tuple))
}
}
}
}
}
struct HashJoinIterator {
left_iter: EncodedTuplesIterator,
right: HashMap<Vec<Option<EncodedTerm>>, Vec<EncodedTuple>>,
buffered_results: Vec<Result<EncodedTuple, EvaluationError>>, buffered_results: Vec<Result<EncodedTuple, EvaluationError>>,
join_keys: Vec<usize>,
} }
impl Iterator for JoinIterator { impl Iterator for HashJoinIterator {
type Item = Result<EncodedTuple, EvaluationError>; type Item = Result<EncodedTuple, EvaluationError>;
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> { fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> {
@ -2912,25 +2996,32 @@ impl Iterator for JoinIterator {
if let Some(result) = self.buffered_results.pop() { if let Some(result) = self.buffered_results.pop() {
return Some(result); return Some(result);
} }
let right_tuple = match self.right_iter.next()? { let left_tuple = match self.left_iter.next()? {
Ok(right_tuple) => right_tuple, Ok(left_tuple) => left_tuple,
Err(error) => return Some(Err(error)), Err(error) => return Some(Err(error)),
}; };
for left_tuple in &self.left { let key: Vec<_> = self
if let Some(result_tuple) = left_tuple.combine_with(&right_tuple) { .join_keys
.iter()
.map(|k| left_tuple.get(*k).cloned())
.collect();
if let Some(right_tuples) = self.right.get(&key) {
for right_tuple in right_tuples {
if let Some(result_tuple) = left_tuple.combine_with(right_tuple) {
self.buffered_results.push(Ok(result_tuple)) self.buffered_results.push(Ok(result_tuple))
} }
} }
} }
} }
}
} }
struct AntiJoinIterator { struct CartesianProductAntiJoinIterator {
left_iter: EncodedTuplesIterator, left_iter: EncodedTuplesIterator,
right: Vec<EncodedTuple>, right: Vec<EncodedTuple>,
} }
impl Iterator for AntiJoinIterator { impl Iterator for CartesianProductAntiJoinIterator {
type Item = Result<EncodedTuple, EvaluationError>; type Item = Result<EncodedTuple, EvaluationError>;
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> { fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> {
@ -2950,6 +3041,39 @@ impl Iterator for AntiJoinIterator {
} }
} }
struct HashAntiJoinIterator {
left_iter: EncodedTuplesIterator,
right: HashMap<Vec<Option<EncodedTerm>>, Vec<EncodedTuple>>,
join_keys: Vec<usize>,
}
impl Iterator for HashAntiJoinIterator {
type Item = Result<EncodedTuple, EvaluationError>;
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> {
loop {
match self.left_iter.next()? {
Ok(left_tuple) => {
let key: Vec<_> = self
.join_keys
.iter()
.map(|k| left_tuple.get(*k).cloned())
.collect();
let exists_compatible_right = self.right.get(&key).map_or(false, |r| {
r.iter().any(|right_tuple| {
are_compatible_and_not_disjointed(&left_tuple, right_tuple)
})
});
if !exists_compatible_right {
return Some(Ok(left_tuple));
}
}
Err(error) => return Some(Err(error)),
}
}
}
}
struct LeftJoinIterator { struct LeftJoinIterator {
right_evaluator: Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator>, right_evaluator: Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator>,
left_iter: EncodedTuplesIterator, left_iter: EncodedTuplesIterator,

@ -30,14 +30,17 @@ pub enum PlanNode {
object: PatternValue, object: PatternValue,
graph_name: PatternValue, graph_name: PatternValue,
}, },
/// Streams left and materializes right join
HashJoin { HashJoin {
left: Rc<Self>, left: Rc<Self>,
right: Rc<Self>, right: Rc<Self>,
}, },
/// Right nested in left loop
ForLoopJoin { ForLoopJoin {
left: Rc<Self>, left: Rc<Self>,
right: Rc<Self>, right: Rc<Self>,
}, },
/// Streams left and materializes right anti join
AntiJoin { AntiJoin {
left: Rc<Self>, left: Rc<Self>,
right: Rc<Self>, right: Rc<Self>,
@ -49,6 +52,7 @@ pub enum PlanNode {
Union { Union {
children: Vec<Rc<Self>>, children: Vec<Rc<Self>>,
}, },
/// right nested in left loop
LeftJoin { LeftJoin {
left: Rc<Self>, left: Rc<Self>,
right: Rc<Self>, right: Rc<Self>,

Loading…
Cancel
Save