|
|
|
@ -245,17 +245,17 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
})), |
|
|
|
|
PlanNode::Join { left, right } => { |
|
|
|
|
//TODO: very dumb implementation
|
|
|
|
|
let left_iter = self.eval_plan(&*left, from.clone()); |
|
|
|
|
let mut left_values = Vec::with_capacity(left_iter.size_hint().0); |
|
|
|
|
let mut errors = Vec::default(); |
|
|
|
|
for result in left_iter { |
|
|
|
|
match result { |
|
|
|
|
Ok(result) => { |
|
|
|
|
left_values.push(result); |
|
|
|
|
let left_values = self |
|
|
|
|
.eval_plan(&*left, from.clone()) |
|
|
|
|
.filter_map(|result| match result { |
|
|
|
|
Ok(result) => Some(result), |
|
|
|
|
Err(error) => { |
|
|
|
|
errors.push(Err(error)); |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
Err(error) => errors.push(Err(error)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
Box::new(JoinIterator { |
|
|
|
|
left: left_values, |
|
|
|
|
right_iter: self.eval_plan(&*right, from), |
|
|
|
@ -332,17 +332,17 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
})) |
|
|
|
|
} |
|
|
|
|
PlanNode::Sort { child, by } => { |
|
|
|
|
let iter = self.eval_plan(&*child, from); |
|
|
|
|
let mut values = Vec::with_capacity(iter.size_hint().0); |
|
|
|
|
let mut errors = Vec::default(); |
|
|
|
|
for result in iter { |
|
|
|
|
match result { |
|
|
|
|
Ok(result) => { |
|
|
|
|
values.push(result); |
|
|
|
|
let mut values = self |
|
|
|
|
.eval_plan(&*child, from) |
|
|
|
|
.filter_map(|result| match result { |
|
|
|
|
Ok(result) => Some(result), |
|
|
|
|
Err(error) => { |
|
|
|
|
errors.push(Err(error)); |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
Err(error) => errors.push(Err(error)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.collect::<Vec<_>>(); |
|
|
|
|
values.sort_unstable_by(|a, b| { |
|
|
|
|
for comp in by { |
|
|
|
|
match comp { |
|
|
|
@ -374,14 +374,20 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
Box::new(self.eval_plan(&*child, from).take(*count)) |
|
|
|
|
} |
|
|
|
|
PlanNode::Project { child, mapping } => { |
|
|
|
|
Box::new(self.eval_plan(&*child, from).map(move |tuple| { |
|
|
|
|
let tuple = tuple?; |
|
|
|
|
let mut new_tuple = Vec::with_capacity(mapping.len()); |
|
|
|
|
for key in mapping { |
|
|
|
|
new_tuple.push(tuple[*key]); |
|
|
|
|
} |
|
|
|
|
Ok(new_tuple) |
|
|
|
|
})) |
|
|
|
|
//TODO: use from somewhere?
|
|
|
|
|
Box::new( |
|
|
|
|
self.eval_plan(&*child, vec![None; mapping.len()]) |
|
|
|
|
.map(move |tuple| { |
|
|
|
|
let tuple = tuple?; |
|
|
|
|
let mut output_tuple = vec![None; from.len()]; |
|
|
|
|
for (input_key, output_key) in mapping.iter() { |
|
|
|
|
if let Some(value) = tuple[*input_key] { |
|
|
|
|
put_value(*output_key, value, &mut output_tuple) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(output_tuple) |
|
|
|
|
}), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1495,19 +1501,18 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
'a: 'b, |
|
|
|
|
{ |
|
|
|
|
let eval = self; |
|
|
|
|
let tuple_size = variables.len(); |
|
|
|
|
BindingsIterator::new( |
|
|
|
|
variables, |
|
|
|
|
Box::new(iter.map(move |values| { |
|
|
|
|
let encoder = eval.dataset.encoder(); |
|
|
|
|
values? |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|value| { |
|
|
|
|
Ok(match value { |
|
|
|
|
Some(term) => Some(encoder.decode_term(term)?), |
|
|
|
|
None => None, |
|
|
|
|
}) |
|
|
|
|
}) |
|
|
|
|
.collect() |
|
|
|
|
let mut result = vec![None; tuple_size]; |
|
|
|
|
for (i, value) in values?.into_iter().enumerate() { |
|
|
|
|
if let Some(term) = value { |
|
|
|
|
result[i] = Some(encoder.decode_term(term)?) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(result) |
|
|
|
|
})), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
@ -2140,9 +2145,6 @@ impl<'a, S: StoreConnection> Iterator for ConstructIterator<'a, S> { |
|
|
|
|
) { |
|
|
|
|
self.buffered_results |
|
|
|
|
.push(decode_triple(&encoder, subject, predicate, object)); |
|
|
|
|
} else { |
|
|
|
|
self.buffered_results.clear(); //No match, we do not output any triple for this row
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.bnodes.clear(); //We do not reuse old bnodes
|
|
|
|
|