|
|
|
@ -26,11 +26,10 @@ use std::collections::BTreeMap; |
|
|
|
|
use std::collections::HashSet; |
|
|
|
|
use std::convert::TryInto; |
|
|
|
|
use std::fmt::Write; |
|
|
|
|
use std::iter::once; |
|
|
|
|
use std::iter::Iterator; |
|
|
|
|
use std::iter::{empty, once}; |
|
|
|
|
use std::ops::Deref; |
|
|
|
|
use std::str; |
|
|
|
|
use std::sync::Arc; |
|
|
|
|
use std::sync::Mutex; |
|
|
|
|
use std::u64; |
|
|
|
|
use uuid::Uuid; |
|
|
|
@ -39,11 +38,10 @@ const REGEX_SIZE_LIMIT: usize = 1_000_000; |
|
|
|
|
|
|
|
|
|
type EncodedTuplesIterator<'a> = Box<dyn Iterator<Item = Result<EncodedTuple>> + 'a>; |
|
|
|
|
|
|
|
|
|
#[derive(Clone)] |
|
|
|
|
pub struct SimpleEvaluator<S: StoreConnection> { |
|
|
|
|
dataset: DatasetView<S>, |
|
|
|
|
bnodes_map: Arc<Mutex<BTreeMap<u64, Uuid>>>, |
|
|
|
|
base_iri: Option<Arc<Iri<String>>>, |
|
|
|
|
bnodes_map: Mutex<BTreeMap<u64, Uuid>>, |
|
|
|
|
base_iri: Option<Iri<String>>, |
|
|
|
|
now: DateTime<FixedOffset>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -51,8 +49,8 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
pub fn new(dataset: S, base_iri: Option<Iri<String>>) -> Self { |
|
|
|
|
Self { |
|
|
|
|
dataset: DatasetView::new(dataset), |
|
|
|
|
bnodes_map: Arc::new(Mutex::new(BTreeMap::default())), |
|
|
|
|
base_iri: base_iri.map(Arc::new), |
|
|
|
|
bnodes_map: Mutex::new(BTreeMap::default()), |
|
|
|
|
base_iri, |
|
|
|
|
now: Utc::now().with_timezone(&FixedOffset::east(0)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -91,7 +89,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
'a: 'b, |
|
|
|
|
{ |
|
|
|
|
Ok(QueryResult::Graph(Box::new(ConstructIterator { |
|
|
|
|
dataset: self.dataset.clone(), |
|
|
|
|
eval: self, |
|
|
|
|
iter: self.eval_plan(plan, vec![]), |
|
|
|
|
template: construct, |
|
|
|
|
buffered_results: Vec::default(), |
|
|
|
@ -104,9 +102,9 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
'a: 'b, |
|
|
|
|
{ |
|
|
|
|
Ok(QueryResult::Graph(Box::new(DescribeIterator { |
|
|
|
|
dataset: self.dataset.clone(), |
|
|
|
|
eval: self, |
|
|
|
|
iter: self.eval_plan(plan, vec![]), |
|
|
|
|
quads: Vec::default(), |
|
|
|
|
quads: Box::new(empty()), |
|
|
|
|
}))) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -228,7 +226,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
let mut filtered_from = from.clone(); |
|
|
|
|
unbind_variables(&mut filtered_from, &problem_vars); |
|
|
|
|
let iter = LeftJoinIterator { |
|
|
|
|
eval: self.clone(), |
|
|
|
|
eval: self, |
|
|
|
|
right_plan: &*right, |
|
|
|
|
left_iter: self.eval_plan(&*left, filtered_from), |
|
|
|
|
current_right: Vec::default(), |
|
|
|
@ -244,7 +242,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
PlanNode::Filter { child, expression } => { |
|
|
|
|
let eval = self.clone(); |
|
|
|
|
let eval = self; |
|
|
|
|
Box::new(self.eval_plan(&*child, from).filter(move |tuple| { |
|
|
|
|
match tuple { |
|
|
|
|
Ok(tuple) => eval |
|
|
|
@ -256,17 +254,19 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
})) |
|
|
|
|
} |
|
|
|
|
PlanNode::Union { entry, children } => Box::new(UnionIterator { |
|
|
|
|
eval: self.clone(), |
|
|
|
|
eval: self, |
|
|
|
|
children_plan: &children, |
|
|
|
|
input_iter: self.eval_plan(&*entry, from), |
|
|
|
|
current: Vec::default(), |
|
|
|
|
current_input: Vec::default(), |
|
|
|
|
current_iterator: Box::new(empty()), |
|
|
|
|
current_child: children.len(), |
|
|
|
|
}), |
|
|
|
|
PlanNode::Extend { |
|
|
|
|
child, |
|
|
|
|
position, |
|
|
|
|
expression, |
|
|
|
|
} => { |
|
|
|
|
let eval = self.clone(); |
|
|
|
|
let eval = self; |
|
|
|
|
Box::new(self.eval_plan(&*child, from).map(move |tuple| { |
|
|
|
|
let mut tuple = tuple?; |
|
|
|
|
if let Some(value) = eval.eval_expression(&expression, &tuple) { |
|
|
|
@ -1275,11 +1275,11 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
where |
|
|
|
|
'a: 'b, |
|
|
|
|
{ |
|
|
|
|
let dataset = self.dataset.clone(); |
|
|
|
|
let eval = self; |
|
|
|
|
BindingsIterator::new( |
|
|
|
|
variables, |
|
|
|
|
Box::new(iter.map(move |values| { |
|
|
|
|
let encoder = dataset.encoder(); |
|
|
|
|
let encoder = eval.dataset.encoder(); |
|
|
|
|
values? |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|value| { |
|
|
|
@ -1531,17 +1531,16 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Clone)] |
|
|
|
|
struct DatasetView<S: StoreConnection> { |
|
|
|
|
store: S, |
|
|
|
|
extra: Arc<MemoryStringStore>, |
|
|
|
|
extra: MemoryStringStore, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl<S: StoreConnection> DatasetView<S> { |
|
|
|
|
fn new(store: S) -> Self { |
|
|
|
|
Self { |
|
|
|
|
store, |
|
|
|
|
extra: Arc::new(MemoryStringStore::default()), |
|
|
|
|
extra: MemoryStringStore::default(), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1751,19 +1750,20 @@ impl<'a> Iterator for JoinIterator<'a> { |
|
|
|
|
type Item = Result<EncodedTuple>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedTuple>> { |
|
|
|
|
if let Some(result) = self.buffered_results.pop() { |
|
|
|
|
return Some(result); |
|
|
|
|
} |
|
|
|
|
let right_tuple = match self.right_iter.next()? { |
|
|
|
|
Ok(right_tuple) => right_tuple, |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
}; |
|
|
|
|
for left_tuple in &self.left { |
|
|
|
|
if let Some(result_tuple) = combine_tuples(left_tuple, &right_tuple) { |
|
|
|
|
self.buffered_results.push(Ok(result_tuple)) |
|
|
|
|
loop { |
|
|
|
|
if let Some(result) = self.buffered_results.pop() { |
|
|
|
|
return Some(result); |
|
|
|
|
} |
|
|
|
|
let right_tuple = match self.right_iter.next()? { |
|
|
|
|
Ok(right_tuple) => right_tuple, |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
}; |
|
|
|
|
for left_tuple in &self.left { |
|
|
|
|
if let Some(result_tuple) = combine_tuples(left_tuple, &right_tuple) { |
|
|
|
|
self.buffered_results.push(Ok(result_tuple)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.next() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1793,7 +1793,7 @@ impl<'a> Iterator for AntiJoinIterator<'a> { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct LeftJoinIterator<'a, S: StoreConnection + 'a> { |
|
|
|
|
eval: SimpleEvaluator<S>, |
|
|
|
|
eval: &'a SimpleEvaluator<S>, |
|
|
|
|
right_plan: &'a PlanNode, |
|
|
|
|
left_iter: EncodedTuplesIterator<'a>, |
|
|
|
|
current_right: Vec<Result<EncodedTuple>>, //TODO: keep using an iterator?
|
|
|
|
@ -1861,29 +1861,37 @@ impl<'a, S: StoreConnection> Iterator for BadLeftJoinIterator<'a, S> { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct UnionIterator<'a, S: StoreConnection + 'a> { |
|
|
|
|
eval: SimpleEvaluator<S>, |
|
|
|
|
children_plan: &'a Vec<PlanNode>, |
|
|
|
|
eval: &'a SimpleEvaluator<S>, |
|
|
|
|
children_plan: &'a [PlanNode], |
|
|
|
|
input_iter: EncodedTuplesIterator<'a>, |
|
|
|
|
current: Vec<Result<EncodedTuple>>, //TODO: avoid
|
|
|
|
|
current_input: EncodedTuple, |
|
|
|
|
current_iterator: EncodedTuplesIterator<'a>, |
|
|
|
|
current_child: usize, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl<'a, S: StoreConnection> Iterator for UnionIterator<'a, S> { |
|
|
|
|
type Item = Result<EncodedTuple>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedTuple>> { |
|
|
|
|
if let Some(tuple) = self.current.pop() { |
|
|
|
|
return Some(tuple); |
|
|
|
|
} |
|
|
|
|
match self.input_iter.next()? { |
|
|
|
|
Ok(input_tuple) => { |
|
|
|
|
for plan in self.children_plan { |
|
|
|
|
self.current |
|
|
|
|
.extend(self.eval.eval_plan(plan, input_tuple.clone())); |
|
|
|
|
loop { |
|
|
|
|
if let Some(tuple) = self.current_iterator.next() { |
|
|
|
|
return Some(tuple); |
|
|
|
|
} |
|
|
|
|
if self.current_child == self.children_plan.len() { |
|
|
|
|
match self.input_iter.next()? { |
|
|
|
|
Ok(input_tuple) => { |
|
|
|
|
self.current_input = input_tuple; |
|
|
|
|
self.current_child = 0; |
|
|
|
|
} |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
self.current_iterator = self.eval.eval_plan( |
|
|
|
|
&self.children_plan[self.current_child], |
|
|
|
|
self.current_input.clone(), |
|
|
|
|
); |
|
|
|
|
self.current_child += 1; |
|
|
|
|
} |
|
|
|
|
self.next() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1896,21 +1904,21 @@ impl<'a> Iterator for HashDeduplicateIterator<'a> { |
|
|
|
|
type Item = Result<EncodedTuple>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedTuple>> { |
|
|
|
|
match self.iter.next()? { |
|
|
|
|
Ok(tuple) => { |
|
|
|
|
if self.already_seen.insert(tuple.clone()) { |
|
|
|
|
Some(Ok(tuple)) |
|
|
|
|
} else { |
|
|
|
|
self.next() |
|
|
|
|
loop { |
|
|
|
|
match self.iter.next()? { |
|
|
|
|
Ok(tuple) => { |
|
|
|
|
if self.already_seen.insert(tuple.clone()) { |
|
|
|
|
return Some(Ok(tuple)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
} |
|
|
|
|
Err(error) => Some(Err(error)), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct ConstructIterator<'a, S: StoreConnection> { |
|
|
|
|
dataset: DatasetView<S>, |
|
|
|
|
eval: &'a SimpleEvaluator<S>, |
|
|
|
|
iter: EncodedTuplesIterator<'a>, |
|
|
|
|
template: &'a [TripleTemplate], |
|
|
|
|
buffered_results: Vec<Result<Triple>>, |
|
|
|
@ -1921,31 +1929,32 @@ impl<'a, S: StoreConnection> Iterator for ConstructIterator<'a, S> { |
|
|
|
|
type Item = Result<Triple>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<Triple>> { |
|
|
|
|
if let Some(result) = self.buffered_results.pop() { |
|
|
|
|
return Some(result); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
let tuple = match self.iter.next()? { |
|
|
|
|
Ok(tuple) => tuple, |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
}; |
|
|
|
|
let encoder = self.dataset.encoder(); |
|
|
|
|
for template in self.template { |
|
|
|
|
if let (Some(subject), Some(predicate), Some(object)) = ( |
|
|
|
|
get_triple_template_value(&template.subject, &tuple, &mut self.bnodes), |
|
|
|
|
get_triple_template_value(&template.predicate, &tuple, &mut self.bnodes), |
|
|
|
|
get_triple_template_value(&template.object, &tuple, &mut self.bnodes), |
|
|
|
|
) { |
|
|
|
|
self.buffered_results |
|
|
|
|
.push(decode_triple(&encoder, subject, predicate, object)); |
|
|
|
|
} else { |
|
|
|
|
self.buffered_results.clear(); //No match, we do not output any triple for this row
|
|
|
|
|
break; |
|
|
|
|
loop { |
|
|
|
|
if let Some(result) = self.buffered_results.pop() { |
|
|
|
|
return Some(result); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
let tuple = match self.iter.next()? { |
|
|
|
|
Ok(tuple) => tuple, |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
}; |
|
|
|
|
let encoder = self.eval.dataset.encoder(); |
|
|
|
|
for template in self.template { |
|
|
|
|
if let (Some(subject), Some(predicate), Some(object)) = ( |
|
|
|
|
get_triple_template_value(&template.subject, &tuple, &mut self.bnodes), |
|
|
|
|
get_triple_template_value(&template.predicate, &tuple, &mut self.bnodes), |
|
|
|
|
get_triple_template_value(&template.object, &tuple, &mut self.bnodes), |
|
|
|
|
) { |
|
|
|
|
self.buffered_results |
|
|
|
|
.push(decode_triple(&encoder, subject, predicate, object)); |
|
|
|
|
} else { |
|
|
|
|
self.buffered_results.clear(); //No match, we do not output any triple for this row
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.bnodes.clear(); //We do not reuse old bnodes
|
|
|
|
|
} |
|
|
|
|
self.bnodes.clear(); //We do not reuse old bnodes
|
|
|
|
|
} |
|
|
|
|
self.next() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1980,38 +1989,40 @@ fn decode_triple<S: StringStore>( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct DescribeIterator<'a, S: StoreConnection + 'a> { |
|
|
|
|
dataset: DatasetView<S>, |
|
|
|
|
eval: &'a SimpleEvaluator<S>, |
|
|
|
|
iter: EncodedTuplesIterator<'a>, |
|
|
|
|
quads: Vec<Result<EncodedQuad>>, |
|
|
|
|
quads: Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl<'a, S: StoreConnection> Iterator for DescribeIterator<'a, S> { |
|
|
|
|
type Item = Result<Triple>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<Triple>> { |
|
|
|
|
if let Some(quad) = self.quads.pop() { |
|
|
|
|
return Some(match quad { |
|
|
|
|
Ok(quad) => self |
|
|
|
|
.dataset |
|
|
|
|
.encoder() |
|
|
|
|
.decode_quad(&quad) |
|
|
|
|
.map(|q| q.into_triple()), |
|
|
|
|
Err(error) => Err(error), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
let tuple = match self.iter.next()? { |
|
|
|
|
Ok(tuple) => tuple, |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
}; |
|
|
|
|
for subject in tuple { |
|
|
|
|
if let Some(subject) = subject { |
|
|
|
|
self.quads = self |
|
|
|
|
.dataset |
|
|
|
|
.quads_for_pattern(Some(subject), None, None, None) |
|
|
|
|
.collect(); |
|
|
|
|
loop { |
|
|
|
|
if let Some(quad) = self.quads.next() { |
|
|
|
|
return Some(match quad { |
|
|
|
|
Ok(quad) => self |
|
|
|
|
.eval |
|
|
|
|
.dataset |
|
|
|
|
.encoder() |
|
|
|
|
.decode_quad(&quad) |
|
|
|
|
.map(|q| q.into_triple()), |
|
|
|
|
Err(error) => Err(error), |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
let tuple = match self.iter.next()? { |
|
|
|
|
Ok(tuple) => tuple, |
|
|
|
|
Err(error) => return Some(Err(error)), |
|
|
|
|
}; |
|
|
|
|
for subject in tuple { |
|
|
|
|
if let Some(subject) = subject { |
|
|
|
|
self.quads = |
|
|
|
|
self.eval |
|
|
|
|
.dataset |
|
|
|
|
.quads_for_pattern(Some(subject), None, None, None); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self.next() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|