|
|
|
@ -303,12 +303,15 @@ impl SimpleEvaluator { |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
let mut errors = Vec::default(); |
|
|
|
|
let mut right_values = EncodedTupleSet::new(join_keys.clone()); |
|
|
|
|
for result in right(from.clone()) { |
|
|
|
|
match result { |
|
|
|
|
Ok(result) => right_values.insert(result), |
|
|
|
|
Err(error) => errors.push(Err(error)), |
|
|
|
|
} |
|
|
|
|
right_values.extend(right(from.clone()).filter_map( |
|
|
|
|
|result| match result { |
|
|
|
|
Ok(result) => Some(result), |
|
|
|
|
Err(error) => { |
|
|
|
|
errors.push(Err(error)); |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
)); |
|
|
|
|
Box::new(HashJoinIterator { |
|
|
|
|
left_iter: left(from), |
|
|
|
|
right: right_values, |
|
|
|
@ -341,21 +344,30 @@ impl SimpleEvaluator { |
|
|
|
|
let right: Vec<_> = right(from.clone()) |
|
|
|
|
.filter_map(std::result::Result::ok) |
|
|
|
|
.collect(); |
|
|
|
|
Box::new(CartesianProductAntiJoinIterator { |
|
|
|
|
left_iter: left(from), |
|
|
|
|
right, |
|
|
|
|
Box::new(left(from).filter(move |left_tuple| { |
|
|
|
|
if let Ok(left_tuple) = left_tuple { |
|
|
|
|
!right.iter().any(|right_tuple| { |
|
|
|
|
are_compatible_and_not_disjointed(left_tuple, right_tuple) |
|
|
|
|
}) |
|
|
|
|
} else { |
|
|
|
|
true |
|
|
|
|
} |
|
|
|
|
})) |
|
|
|
|
}) |
|
|
|
|
} else { |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
let mut right_values = EncodedTupleSet::new(join_keys.clone()); |
|
|
|
|
for result in right(from.clone()).filter_map(std::result::Result::ok) { |
|
|
|
|
right_values.insert(result); |
|
|
|
|
} |
|
|
|
|
Box::new(HashAntiJoinIterator { |
|
|
|
|
left_iter: left(from), |
|
|
|
|
right: right_values, |
|
|
|
|
right_values |
|
|
|
|
.extend(right(from.clone()).filter_map(std::result::Result::ok)); |
|
|
|
|
Box::new(left(from).filter(move |left_tuple| { |
|
|
|
|
if let Ok(left_tuple) = left_tuple { |
|
|
|
|
!right_values.get(left_tuple).iter().any(|right_tuple| { |
|
|
|
|
are_compatible_and_not_disjointed(left_tuple, right_tuple) |
|
|
|
|
}) |
|
|
|
|
} else { |
|
|
|
|
true |
|
|
|
|
} |
|
|
|
|
})) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -2974,6 +2986,11 @@ impl Iterator for CartesianProductJoinIterator { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) { |
|
|
|
|
let (min, max) = self.left_iter.size_hint(); |
|
|
|
|
(min * self.right.len(), max.map(|v| v * self.right.len())) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct HashJoinIterator { |
|
|
|
@ -3001,56 +3018,12 @@ impl Iterator for HashJoinIterator { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct CartesianProductAntiJoinIterator { |
|
|
|
|
left_iter: EncodedTuplesIterator, |
|
|
|
|
right: Vec<EncodedTuple>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Iterator for CartesianProductAntiJoinIterator { |
|
|
|
|
type Item = Result<EncodedTuple, EvaluationError>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> { |
|
|
|
|
loop { |
|
|
|
|
match self.left_iter.next()? { |
|
|
|
|
Ok(left_tuple) => { |
|
|
|
|
let exists_compatible_right = self.right.iter().any(|right_tuple| { |
|
|
|
|
are_compatible_and_not_disjointed(&left_tuple, right_tuple) |
|
|
|
|
}); |
|
|
|
|
if !exists_compatible_right { |
|
|
|
|
return Some(Ok(left_tuple)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct HashAntiJoinIterator { |
|
|
|
|
left_iter: EncodedTuplesIterator, |
|
|
|
|
right: EncodedTupleSet, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Iterator for HashAntiJoinIterator { |
|
|
|
|
type Item = Result<EncodedTuple, EvaluationError>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> { |
|
|
|
|
loop { |
|
|
|
|
match self.left_iter.next()? { |
|
|
|
|
Ok(left_tuple) => { |
|
|
|
|
let exists_compatible_right = |
|
|
|
|
self.right.get(&left_tuple).iter().any(|right_tuple| { |
|
|
|
|
are_compatible_and_not_disjointed(&left_tuple, right_tuple) |
|
|
|
|
}); |
|
|
|
|
if !exists_compatible_right { |
|
|
|
|
return Some(Ok(left_tuple)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) { |
|
|
|
|
( |
|
|
|
|
0, |
|
|
|
|
self.left_iter.size_hint().1.map(|v| v * self.right.len()), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3187,6 +3160,11 @@ impl Iterator for ConsecutiveDeduplication { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) { |
|
|
|
|
let (min, max) = self.inner.size_hint(); |
|
|
|
|
(if min == 0 { 0 } else { 1 }, max) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct ConstructIterator { |
|
|
|
@ -3228,6 +3206,14 @@ impl Iterator for ConstructIterator { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) { |
|
|
|
|
let (min, max) = self.iter.size_hint(); |
|
|
|
|
( |
|
|
|
|
min * self.template.len(), |
|
|
|
|
max.map(|v| v * self.template.len()), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn get_triple_template_value<'a>( |
|
|
|
@ -3771,6 +3757,7 @@ pub enum ComparatorFunction { |
|
|
|
|
struct EncodedTupleSet { |
|
|
|
|
key: Vec<usize>, |
|
|
|
|
map: HashMap<u64, Vec<EncodedTuple>>, |
|
|
|
|
len: usize, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl EncodedTupleSet { |
|
|
|
@ -3778,6 +3765,7 @@ impl EncodedTupleSet { |
|
|
|
|
Self { |
|
|
|
|
key, |
|
|
|
|
map: HashMap::new(), |
|
|
|
|
len: 0, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
fn insert(&mut self, tuple: EncodedTuple) { |
|
|
|
@ -3785,6 +3773,7 @@ impl EncodedTupleSet { |
|
|
|
|
.entry(self.tuple_key(&tuple)) |
|
|
|
|
.or_default() |
|
|
|
|
.push(tuple); |
|
|
|
|
self.len += 1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn get(&self, tuple: &EncodedTuple) -> &[EncodedTuple] { |
|
|
|
@ -3800,6 +3789,20 @@ impl EncodedTupleSet { |
|
|
|
|
} |
|
|
|
|
hasher.finish() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn len(&self) -> usize { |
|
|
|
|
self.len |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Extend<EncodedTuple> for EncodedTupleSet { |
|
|
|
|
fn extend<T: IntoIterator<Item = EncodedTuple>>(&mut self, iter: T) { |
|
|
|
|
let iter = iter.into_iter(); |
|
|
|
|
self.map.reserve(iter.size_hint().0); |
|
|
|
|
for tuple in iter { |
|
|
|
|
self.insert(tuple); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|