Implements SPARQL aggregation

pull/10/head
Tpt 5 years ago
parent 52461f3242
commit d68d680436
  1. 3
      lib/src/sparql/algebra.rs
  2. 473
      lib/src/sparql/eval.rs
  3. 37
      lib/src/sparql/parser.rs
  4. 34
      lib/src/sparql/plan.rs
  5. 138
      lib/src/sparql/plan_builder.rs
  6. 21
      lib/src/sparql/sparql_grammar.rustpeg
  7. 24
      lib/src/store/numeric_encoder.rs
  8. 9
      lib/tests/sparql_test_cases.rs

@ -820,6 +820,7 @@ impl<'a> fmt::Display for SparqlGraphPattern<'a> {
"{{ SELECT {} WHERE {{ {} }} GROUP BY {} }}",
agg.iter()
.map(|(a, v)| format!("({} AS {})", SparqlAggregation(&a), v))
.chain(group.iter().map(|e| e.to_string()))
.collect::<Vec<String>>()
.join(" "),
SparqlGraphPattern(&*p),
@ -919,7 +920,7 @@ impl<'a> fmt::Display for SparqlGraphRootPattern<'a> {
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)]
pub struct GroupPattern(pub Vec<Expression>, pub Box<GraphPattern>);
pub struct GroupPattern(pub Vec<Variable>, pub Box<GraphPattern>);
impl fmt::Display for GroupPattern {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

@ -23,8 +23,7 @@ use sha1::Sha1;
use sha2::{Sha256, Sha384, Sha512};
use std::cmp::min;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryInto;
use std::fmt::Write;
use std::hash::Hash;
@ -389,6 +388,119 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
}),
)
}
PlanNode::Aggregate {
child,
key_mapping,
aggregates,
} => {
let tuple_size = from.len(); //TODO: not nice
let mut errors = Vec::default();
let mut accumulators_for_group =
HashMap::<Vec<Option<EncodedTerm>>, Vec<Box<dyn Accumulator>>>::default();
self.eval_plan(child, from)
.filter_map(|result| match result {
Ok(result) => Some(result),
Err(error) => {
errors.push(error);
None
}
})
.for_each(|tuple| {
//TODO avoid copy for key?
let key = (0..key_mapping.len())
.map(|v| get_tuple_value(v, &tuple))
.collect();
let key_accumulators =
accumulators_for_group.entry(key).or_insert_with(|| {
aggregates
.iter()
.map(|(aggregate, _)| {
self.accumulator_for_aggregate(
&aggregate.function,
aggregate.distinct,
)
})
.collect::<Vec<_>>()
});
for (i, accumulator) in key_accumulators.iter_mut().enumerate() {
let (aggregate, _) = &aggregates[i];
accumulator.add(
aggregate
.parameter
.as_ref()
.and_then(|parameter| self.eval_expression(&parameter, &tuple)),
);
}
});
if accumulators_for_group.is_empty() {
// There is always at least one group
accumulators_for_group.insert(vec![None; key_mapping.len()], Vec::default());
}
Box::new(
errors
.into_iter()
.map(Err)
.chain(accumulators_for_group.into_iter().map(
move |(key, accumulators)| {
let mut result = vec![None; tuple_size];
for (from_position, to_position) in key_mapping.iter().enumerate() {
if let Some(value) = key[from_position] {
put_value(*to_position, value, &mut result);
}
}
for (i, accumulator) in accumulators.into_iter().enumerate() {
if let Some(value) = accumulator.state() {
put_value(aggregates[i].1, value, &mut result);
}
}
Ok(result)
},
)),
)
}
}
}
fn accumulator_for_aggregate<'b>(
&'b self,
function: &'b PlanAggregationFunction,
distinct: bool,
) -> Box<dyn Accumulator + 'b> {
match function {
PlanAggregationFunction::Count => {
if distinct {
Box::new(DistinctAccumulator::new(CountAccumulator::default()))
} else {
Box::new(CountAccumulator::default())
}
}
PlanAggregationFunction::Sum => {
if distinct {
Box::new(DistinctAccumulator::new(SumAccumulator::default()))
} else {
Box::new(SumAccumulator::default())
}
}
PlanAggregationFunction::Min => Box::new(MinAccumulator::new(self)), // DISTINCT does not make sense with min
PlanAggregationFunction::Max => Box::new(MaxAccumulator::new(self)), // DISTINCT does not make sense with max
PlanAggregationFunction::Avg => {
if distinct {
Box::new(DistinctAccumulator::new(AvgAccumulator::default()))
} else {
Box::new(AvgAccumulator::default())
}
}
PlanAggregationFunction::Sample => Box::new(SampleAccumulator::default()), // DISTINCT does not make sense with sample
PlanAggregationFunction::GroupConcat { separator } => {
if distinct {
Box::new(DistinctAccumulator::new(GroupConcatAccumulator::new(
self, separator,
)))
} else {
Box::new(GroupConcatAccumulator::new(self, separator))
}
}
}
}
@ -680,7 +792,9 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
PlanExpression::Div(a, b) => Some(match self.parse_numeric_operands(a, b, tuple)? {
NumericBinaryOperands::Float(v1, v2) => (v1 / v2).into(),
NumericBinaryOperands::Double(v1, v2) => (v1 / v2).into(),
NumericBinaryOperands::Integer(v1, v2) => v1.checked_div(v2)?.into(),
NumericBinaryOperands::Integer(v1, v2) => Decimal::from_i128(v1)?
.checked_div(Decimal::from_i128(v2)?)?
.into(),
NumericBinaryOperands::Decimal(v1, v2) => v1.checked_div(v2)?.into(),
}),
PlanExpression::UnaryPlus(e) => match self.eval_expression(e, tuple)? {
@ -1436,60 +1550,10 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
e2: &PlanExpression,
tuple: &[Option<EncodedTerm>],
) -> Option<NumericBinaryOperands> {
match (
NumericBinaryOperands::new(
self.eval_expression(&e1, tuple)?,
self.eval_expression(&e2, tuple)?,
) {
(EncodedTerm::FloatLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Float(*v1, v2.to_f32()?))
}
(EncodedTerm::FloatLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(v1.to_f64()?, *v2))
}
(EncodedTerm::FloatLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Float(*v1, v2.to_f32()?))
}
(EncodedTerm::FloatLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Float(*v1, v2.to_f32()?))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, v2.to_f64()?))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, *v2))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, v2.to_f64()?))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, v2.to_f64()?))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Float(v1.to_f32()?, *v2))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(v1.to_f64()?, *v2))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Integer(v1, v2))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Decimal(Decimal::from_i128(v1)?, v2))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Float(v1.to_f32()?, *v2))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(v1.to_f64()?, *v2))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Decimal(v1, Decimal::from_i128(v2)?))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Decimal(v1, v2))
}
_ => None,
}
)
}
fn decode_bindings<'b>(
@ -1635,10 +1699,14 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
tuple_b: &[Option<EncodedTerm>],
expression: &PlanExpression,
) -> Ordering {
match (
self.cmp_terms(
self.eval_expression(expression, tuple_a),
self.eval_expression(expression, tuple_b),
) {
)
}
fn cmp_terms(&self, a: Option<EncodedTerm>, b: Option<EncodedTerm>) -> Ordering {
match (a, b) {
(Some(a), Some(b)) => match a {
EncodedTerm::BlankNode(a) => {
if let EncodedTerm::BlankNode(b) = b {
@ -1855,6 +1923,62 @@ enum NumericBinaryOperands {
Decimal(Decimal, Decimal),
}
impl NumericBinaryOperands {
fn new(a: EncodedTerm, b: EncodedTerm) -> Option<Self> {
match (a, b) {
(EncodedTerm::FloatLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Float(*v1, v2.to_f32()?))
}
(EncodedTerm::FloatLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(v1.to_f64()?, *v2))
}
(EncodedTerm::FloatLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Float(*v1, v2.to_f32()?))
}
(EncodedTerm::FloatLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Float(*v1, v2.to_f32()?))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, v2.to_f64()?))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, *v2))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, v2.to_f64()?))
}
(EncodedTerm::DoubleLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Double(*v1, v2.to_f64()?))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Float(v1.to_f32()?, *v2))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(v1.to_f64()?, *v2))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Integer(v1, v2))
}
(EncodedTerm::IntegerLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Decimal(Decimal::from_i128(v1)?, v2))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::FloatLiteral(v2)) => {
Some(NumericBinaryOperands::Float(v1.to_f32()?, *v2))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::DoubleLiteral(v2)) => {
Some(NumericBinaryOperands::Double(v1.to_f64()?, *v2))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::IntegerLiteral(v2)) => {
Some(NumericBinaryOperands::Decimal(v1, Decimal::from_i128(v2)?))
}
(EncodedTerm::DecimalLiteral(v1), EncodedTerm::DecimalLiteral(v2)) => {
Some(NumericBinaryOperands::Decimal(v1, v2))
}
_ => None,
}
}
}
fn get_tuple_value(variable: usize, tuple: &[Option<EncodedTerm>]) -> Option<EncodedTerm> {
if variable < tuple.len() {
tuple[variable]
@ -2359,3 +2483,238 @@ impl<T, O, I: Iterator<Item = Result<T>>, F: FnMut(T) -> U, U: IntoIterator<Item
}
}
}
trait Accumulator {
fn add(&mut self, element: Option<EncodedTerm>);
fn state(&self) -> Option<EncodedTerm>;
}
#[derive(Default, Debug)]
struct DistinctAccumulator<T: Accumulator> {
seen: HashSet<Option<EncodedTerm>>,
inner: T,
}
impl<T: Accumulator> DistinctAccumulator<T> {
fn new(inner: T) -> Self {
Self {
seen: HashSet::default(),
inner,
}
}
}
impl<T: Accumulator> Accumulator for DistinctAccumulator<T> {
fn add(&mut self, element: Option<EncodedTerm>) {
if self.seen.insert(element) {
self.inner.add(element)
}
}
fn state(&self) -> Option<EncodedTerm> {
self.inner.state()
}
}
#[derive(Default, Debug)]
struct CountAccumulator {
count: u64,
}
impl Accumulator for CountAccumulator {
fn add(&mut self, _element: Option<EncodedTerm>) {
self.count += 1;
}
fn state(&self) -> Option<EncodedTerm> {
Some(self.count.into())
}
}
#[derive(Debug)]
struct SumAccumulator {
sum: Option<EncodedTerm>,
}
impl Default for SumAccumulator {
fn default() -> Self {
Self {
sum: Some(0.into()),
}
}
}
impl Accumulator for SumAccumulator {
fn add(&mut self, element: Option<EncodedTerm>) {
if let Some(sum) = self.sum {
if let Some(operands) = element.and_then(|e| NumericBinaryOperands::new(sum, e)) {
//TODO: unify with addition?
self.sum = match operands {
NumericBinaryOperands::Float(v1, v2) => Some((v1 + v2).into()),
NumericBinaryOperands::Double(v1, v2) => Some((v1 + v2).into()),
NumericBinaryOperands::Integer(v1, v2) => v1.checked_add(v2).map(|v| v.into()),
NumericBinaryOperands::Decimal(v1, v2) => v1.checked_add(v2).map(|v| v.into()),
};
} else {
self.sum = None;
}
}
}
fn state(&self) -> Option<EncodedTerm> {
self.sum
}
}
#[derive(Debug, Default)]
struct AvgAccumulator {
sum: SumAccumulator,
count: CountAccumulator,
}
impl Accumulator for AvgAccumulator {
fn add(&mut self, element: Option<EncodedTerm>) {
self.sum.add(element);
self.count.add(element);
}
fn state(&self) -> Option<EncodedTerm> {
let sum = self.sum.state()?;
let count = self.count.state()?;
if count == EncodedTerm::from(0) {
Some(0.into())
} else {
//TODO: deduplicate?
match NumericBinaryOperands::new(sum, count)? {
NumericBinaryOperands::Float(v1, v2) => Some((v1 / v2).into()),
NumericBinaryOperands::Double(v1, v2) => Some((v1 / v2).into()),
NumericBinaryOperands::Integer(v1, v2) => Decimal::from_i128(v1)?
.checked_div(Decimal::from_i128(v2)?)
.map(|v| v.into()),
NumericBinaryOperands::Decimal(v1, v2) => v1.checked_div(v2).map(|v| v.into()),
}
}
}
}
struct MinAccumulator<'a, S: StoreConnection + 'a> {
eval: &'a SimpleEvaluator<S>,
min: Option<Option<EncodedTerm>>,
}
impl<'a, S: StoreConnection + 'a> MinAccumulator<'a, S> {
fn new(eval: &'a SimpleEvaluator<S>) -> Self {
Self { eval, min: None }
}
}
impl<'a, S: StoreConnection + 'a> Accumulator for MinAccumulator<'a, S> {
fn add(&mut self, element: Option<EncodedTerm>) {
if let Some(min) = self.min {
if self.eval.cmp_terms(element, min) == Ordering::Less {
self.min = Some(element)
}
} else {
self.min = Some(element)
}
}
fn state(&self) -> Option<EncodedTerm> {
self.min.and_then(|v| v)
}
}
struct MaxAccumulator<'a, S: StoreConnection + 'a> {
eval: &'a SimpleEvaluator<S>,
max: Option<Option<EncodedTerm>>,
}
impl<'a, S: StoreConnection + 'a> MaxAccumulator<'a, S> {
fn new(eval: &'a SimpleEvaluator<S>) -> Self {
Self { eval, max: None }
}
}
impl<'a, S: StoreConnection + 'a> Accumulator for MaxAccumulator<'a, S> {
fn add(&mut self, element: Option<EncodedTerm>) {
if let Some(max) = self.max {
if self.eval.cmp_terms(element, max) == Ordering::Greater {
self.max = Some(element)
}
} else {
self.max = Some(element)
}
}
fn state(&self) -> Option<EncodedTerm> {
self.max.and_then(|v| v)
}
}
#[derive(Default, Debug)]
struct SampleAccumulator {
value: Option<EncodedTerm>,
}
impl Accumulator for SampleAccumulator {
fn add(&mut self, element: Option<EncodedTerm>) {
if element.is_some() {
self.value = element
}
}
fn state(&self) -> Option<EncodedTerm> {
self.value
}
}
struct GroupConcatAccumulator<'a, S: StoreConnection + 'a> {
eval: &'a SimpleEvaluator<S>,
concat: Option<String>,
language: Option<Option<u64>>,
separator: &'a str,
}
impl<'a, S: StoreConnection + 'a> GroupConcatAccumulator<'a, S> {
fn new(eval: &'a SimpleEvaluator<S>, separator: &'a str) -> Self {
Self {
eval,
concat: Some("".to_owned()),
language: None,
separator,
}
}
}
impl<'a, S: StoreConnection + 'a> Accumulator for GroupConcatAccumulator<'a, S> {
fn add(&mut self, element: Option<EncodedTerm>) {
if let Some(concat) = self.concat.as_mut() {
let element = if let Some(element) = element {
self.eval.to_string_and_language(element)
} else {
None
};
if let Some((value, e_language)) = element {
if let Some(lang) = self.language {
if lang != e_language {
self.language = Some(None)
}
concat.push_str(self.separator);
} else {
self.language = Some(e_language)
}
concat.push_str(&value);
} else {
self.concat = None;
}
}
}
fn state(&self) -> Option<EncodedTerm> {
self.concat.as_ref().and_then(|result| {
self.eval
.build_plain_literal(result, self.language.and_then(|v| v))
})
}
}

@ -207,7 +207,7 @@ mod grammar {
fn build_select(
select: Selection,
wher: GraphPattern,
group: Option<(Vec<Expression>, Vec<(Expression, Variable)>)>,
mut group: Option<(Vec<Variable>, Vec<(Expression, Variable)>)>,
having: Option<Expression>,
order_by: Option<Vec<OrderComparator>>,
offset_limit: Option<(usize, Option<usize>)>,
@ -217,21 +217,22 @@ mod grammar {
let mut p = wher;
//GROUP BY
let aggregations = state.aggregations.pop().unwrap_or_else(BTreeMap::default);
if group.is_none() && !aggregations.is_empty() {
let const_variable = Variable::default();
group = Some((
vec![const_variable.clone()],
vec![(Literal::from(1).into(), const_variable)],
));
}
if let Some((clauses, binds)) = group {
for (e, v) in binds {
p = GraphPattern::Extend(Box::new(p), v, e);
}
let g = GroupPattern(clauses, Box::new(p));
p = GraphPattern::AggregateJoin(g, state.aggregations.clone());
state.aggregations = BTreeMap::default();
p = GraphPattern::AggregateJoin(g, aggregations);
}
if !state.aggregations.is_empty() {
let g = GroupPattern(vec![Literal::from(1).into()], Box::new(p));
p = GraphPattern::AggregateJoin(g, state.aggregations.clone());
state.aggregations = BTreeMap::default();
}
//TODO: not aggregated vars
//HAVING
if let Some(ex) = having {
@ -297,7 +298,7 @@ mod grammar {
namespaces: HashMap<String, String>,
bnodes_map: BTreeMap<String, BlankNode>,
used_bnodes: BTreeSet<String>,
aggregations: BTreeMap<Aggregation, Variable>,
aggregations: Vec<BTreeMap<Aggregation, Variable>>,
}
impl ParserState {
@ -309,12 +310,16 @@ mod grammar {
}
}
fn new_aggregation(&mut self, agg: Aggregation) -> Variable {
self.aggregations.get(&agg).cloned().unwrap_or_else(|| {
fn new_aggregation(&mut self, agg: Aggregation) -> Result<Variable, &'static str> {
let aggregations = self
.aggregations
.last_mut()
.ok_or_else(|| "Unexpected aggregate")?;
Ok(aggregations.get(&agg).cloned().unwrap_or_else(|| {
let new_var = Variable::default();
self.aggregations.insert(agg, new_var.clone());
aggregations.insert(agg, new_var.clone());
new_var
})
}))
}
}
@ -546,7 +551,7 @@ mod grammar {
namespaces: HashMap::default(),
bnodes_map: BTreeMap::default(),
used_bnodes: BTreeSet::default(),
aggregations: BTreeMap::default(),
aggregations: Vec::default(),
};
Ok(QueryUnit(&unescape_unicode_codepoints(query), &mut state)?)

@ -68,6 +68,12 @@ pub enum PlanNode {
child: Box<PlanNode>,
mapping: Vec<(usize, usize)>, // pairs of (variable key in child, variable key in output)
},
Aggregate {
// By definition the group by key are the range 0..key_mapping.len()
child: Box<PlanNode>,
key_mapping: Vec<usize>, //index of the new key for each old key (that is the vec key)
aggregates: Vec<(PlanAggregation, usize)>,
},
}
impl PlanNode {
@ -159,6 +165,16 @@ impl PlanNode {
set.insert(i);
}
}
PlanNode::Aggregate {
key_mapping,
aggregates,
..
} => {
set.extend(key_mapping);
for (_, var) in aggregates {
set.insert(*var);
}
}
}
}
}
@ -389,6 +405,24 @@ impl PlanExpression {
}
}
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub struct PlanAggregation {
pub function: PlanAggregationFunction,
pub parameter: Option<PlanExpression>,
pub distinct: bool,
}
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum PlanAggregationFunction {
Count,
Sum,
Min,
Max,
Avg,
Sample,
GroupConcat { separator: String },
}
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub enum PlanPropertyPath {
PredicatePath(EncodedTerm),

@ -37,7 +37,7 @@ impl<'a, S: StoreConnection> PlanBuilder<'a, S> {
fn build_for_graph_pattern(
&self,
pattern: &GraphPattern,
input: PlanNode,
input: PlanNode, //TODO: is this parameter really useful?
variables: &mut Vec<Variable>,
graph_name: PatternValue,
) -> Result<PlanNode> {
@ -128,7 +128,33 @@ impl<'a, S: StoreConnection> PlanBuilder<'a, S> {
"SPARQL SERVICE clauses are not implemented yet"
))
}
GraphPattern::AggregateJoin(_g, _a) => unimplemented!(),
GraphPattern::AggregateJoin(GroupPattern(key, p), aggregates) => {
let mut inner_variables = key.clone();
let inner_graph_name =
self.convert_pattern_value_id(graph_name, variables, &mut inner_variables);
PlanNode::Aggregate {
child: Box::new(self.build_for_graph_pattern(
p,
input,
&mut inner_variables,
inner_graph_name,
)?),
key_mapping: key
.iter()
.map(|k| variable_key(&mut inner_variables, k))
.collect(),
aggregates: aggregates
.iter()
.map(|(a, v)| {
Ok((
self.build_for_aggregate(a, &mut inner_variables, graph_name)?,
variable_key(variables, v),
))
})
.collect::<Result<Vec<_>>>()?,
}
}
GraphPattern::Data(bs) => PlanNode::StaticBindings {
tuples: self.encode_bindings(bs, variables)?,
},
@ -151,20 +177,8 @@ impl<'a, S: StoreConnection> PlanBuilder<'a, S> {
}
GraphPattern::Project(l, new_variables) => {
let mut inner_variables = new_variables.clone();
let inner_graph_name = match graph_name {
PatternValue::Constant(graph_name) => PatternValue::Constant(graph_name),
PatternValue::Variable(graph_name) => PatternValue::Variable(
new_variables
.iter()
.enumerate()
.find(|(_, var)| *var == &variables[graph_name])
.map(|(new_key, _)| new_key)
.unwrap_or_else(|| {
inner_variables.push(Variable::default());
inner_variables.len() - 1
}),
),
};
let inner_graph_name =
self.convert_pattern_value_id(graph_name, variables, &mut inner_variables);
PlanNode::Project {
child: Box::new(self.build_for_graph_pattern(
l,
@ -731,15 +745,18 @@ impl<'a, S: StoreConnection> PlanBuilder<'a, S> {
variables: &mut Vec<Variable>,
) -> Result<Vec<EncodedTuple>> {
let encoder = self.store.encoder();
let bindings_variables = bindings.variables();
let bindings_variables_keys = bindings
.variables()
.iter()
.map(|v| variable_key(variables, v))
.collect::<Vec<_>>();
bindings
.values_iter()
.map(move |values| {
let mut result = vec![None; variables.len()];
for (key, value) in values.iter().enumerate() {
if let Some(term) = value {
result[variable_key(variables, &bindings_variables[key])] =
Some(encoder.encode_term(term)?);
result[bindings_variables_keys[key]] = Some(encoder.encode_term(term)?);
}
}
Ok(result)
@ -747,6 +764,56 @@ impl<'a, S: StoreConnection> PlanBuilder<'a, S> {
.collect()
}
fn build_for_aggregate(
&self,
aggregate: &Aggregation,
variables: &mut Vec<Variable>,
graph_name: PatternValue,
) -> Result<PlanAggregation> {
Ok(match aggregate {
Aggregation::Count(e, distinct) => PlanAggregation {
function: PlanAggregationFunction::Count,
parameter: match e {
Some(e) => Some(self.build_for_expression(&e, variables, graph_name)?),
None => None,
},
distinct: *distinct,
},
Aggregation::Sum(e, distinct) => PlanAggregation {
function: PlanAggregationFunction::Sum,
parameter: Some(self.build_for_expression(&e, variables, graph_name)?),
distinct: *distinct,
},
Aggregation::Min(e, distinct) => PlanAggregation {
function: PlanAggregationFunction::Min,
parameter: Some(self.build_for_expression(&e, variables, graph_name)?),
distinct: *distinct,
},
Aggregation::Max(e, distinct) => PlanAggregation {
function: PlanAggregationFunction::Max,
parameter: Some(self.build_for_expression(&e, variables, graph_name)?),
distinct: *distinct,
},
Aggregation::Avg(e, distinct) => PlanAggregation {
function: PlanAggregationFunction::Avg,
parameter: Some(self.build_for_expression(&e, variables, graph_name)?),
distinct: *distinct,
},
Aggregation::Sample(e, distinct) => PlanAggregation {
function: PlanAggregationFunction::Sample,
parameter: Some(self.build_for_expression(&e, variables, graph_name)?),
distinct: *distinct,
},
Aggregation::GroupConcat(e, distinct, separator) => PlanAggregation {
function: PlanAggregationFunction::GroupConcat {
separator: separator.clone().unwrap_or_else(|| " ".to_string()),
},
parameter: Some(self.build_for_expression(&e, variables, graph_name)?),
distinct: *distinct,
},
})
}
fn build_for_graph_template(
&self,
template: &[TriplePattern],
@ -816,6 +883,39 @@ impl<'a, S: StoreConnection> PlanBuilder<'a, S> {
}
})
}
fn convert_pattern_value_id(
&self,
from_value: PatternValue,
from: &[Variable],
to: &mut Vec<Variable>,
) -> PatternValue {
match from_value {
PatternValue::Constant(v) => PatternValue::Constant(v),
PatternValue::Variable(from_id) => {
PatternValue::Variable(self.convert_variable_id(from_id, from, to))
}
}
}
fn convert_variable_id(
&self,
from_id: usize,
from: &[Variable],
to: &mut Vec<Variable>,
) -> usize {
if let Some(to_id) = to
.iter()
.enumerate()
.find(|(_, var)| *var == &from[from_id])
.map(|(to_id, _)| to_id)
{
to_id
} else {
to.push(Variable::default());
to.len() - 1
}
}
}
fn variable_key(variables: &mut Vec<Variable>, variable: &Variable) -> usize {

@ -43,12 +43,15 @@ SubSelect -> GraphPattern = s:SelectClause _ w:WhereClause _ g:GroupClause? _ h:
}
//[9]
SelectClause -> Selection = "SELECT"i _ o:SelectClause_option _ v:SelectClause_variables {
SelectClause -> Selection = "SELECT"i _ Selection_init o:SelectClause_option _ v:SelectClause_variables {
Selection {
option: o,
variables: v
}
}
Selection_init = {
state.aggregations.push(BTreeMap::default())
}
SelectClause_option -> SelectionOption =
"DISTINCT"i { SelectionOption::Distinct } /
"REDUCED"i { SelectionOption::Reduced } /
@ -143,15 +146,15 @@ WhereClause -> GraphPattern = "WHERE"i? _ p:GroupGraphPattern {
}
//[19]
GroupClause -> (Vec<Expression>, Vec<(Expression,Variable)>) = "GROUP"i _ "BY"i _ c:GroupCondition_item+ {
GroupClause -> (Vec<Variable>, Vec<(Expression,Variable)>) = "GROUP"i _ "BY"i _ c:GroupCondition_item+ {
let mut projections: Vec<(Expression,Variable)> = Vec::default();
let clauses = c.into_iter().map(|(e, vo)| {
match vo {
Some(v) => {
projections.push((e, v.clone()));
v.into()
},
None => e
if let Expression::Constant(TermOrVariable::Variable(v)) = e {
v
} else {
let v = vo.unwrap_or_else(Variable::default);
projections.push((e, v.clone()));
v
}
}).collect();
(clauses, projections)
@ -771,7 +774,7 @@ BrackettedExpression -> Expression = '(' _ e:Expression _ ')' { e }
//[121]
BuiltInCall -> Expression =
a:Aggregate { state.new_aggregation(a).into() } /
a:Aggregate {? state.new_aggregation(a).map(|v| v.into()) } /
"STR"i _ '(' _ e:Expression _ ')' { Expression::FunctionCall(Function::Str, vec![e]) } /
"LANG"i _ '(' _ e:Expression _ ')' { Expression::FunctionCall(Function::Lang, vec![e]) } /
"LANGMATCHES"i _ '(' _ a:Expression _ ',' _ b:Expression _ ')' { Expression::FunctionCall(Function::LangMatches, vec![a, b]) } /

@ -290,15 +290,15 @@ impl From<i128> for EncodedTerm {
}
}
impl From<f32> for EncodedTerm {
fn from(value: f32) -> Self {
EncodedTerm::FloatLiteral(value.into())
impl From<i64> for EncodedTerm {
fn from(value: i64) -> Self {
EncodedTerm::IntegerLiteral(value.into())
}
}
impl From<f64> for EncodedTerm {
fn from(value: f64) -> Self {
EncodedTerm::DoubleLiteral(value.into())
impl From<u64> for EncodedTerm {
fn from(value: u64) -> Self {
EncodedTerm::IntegerLiteral(value.into())
}
}
@ -314,6 +314,18 @@ impl From<u32> for EncodedTerm {
}
}
impl From<f32> for EncodedTerm {
fn from(value: f32) -> Self {
EncodedTerm::FloatLiteral(value.into())
}
}
impl From<f64> for EncodedTerm {
fn from(value: f64) -> Self {
EncodedTerm::DoubleLiteral(value.into())
}
}
impl From<Decimal> for EncodedTerm {
fn from(value: Decimal) -> Self {
EncodedTerm::DecimalLiteral(value)

@ -69,7 +69,7 @@ fn sparql_w3c_query_evaluation_testsuite() -> Result<()> {
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/bound/manifest.ttl",
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/cast/manifest.ttl",
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/construct/manifest.ttl",
// FROM and FROM NAMED "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/construct/manifest.ttl",
//TODO FROM and FROM NAMED "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/construct/manifest.ttl",
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/distinct/manifest.ttl",
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/expr-builtin/manifest.ttl",
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/expr-equals/manifest.ttl",
@ -87,11 +87,13 @@ fn sparql_w3c_query_evaluation_testsuite() -> Result<()> {
];
let manifest_11_urls = vec![
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/aggregates/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/bind/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/bindings/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/construct/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/exists/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/functions/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/grouping/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/negation/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/project-expression/manifest.ttl",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/property-path/manifest.ttl",
@ -129,8 +131,9 @@ fn sparql_w3c_query_evaluation_testsuite() -> Result<()> {
NamedNode::parse("http://www.w3.org/2009/sparql/docs/tests/data-sparql11/functions/manifest#coalesce01").unwrap(),
//Property path with unbound graph name are not supported yet
NamedNode::parse("http://www.w3.org/2009/sparql/docs/tests/data-sparql11/property-path/manifest#pp35").unwrap(),
//Aggregate in subquery (TODO when aggregates are implemented)
NamedNode::parse("http://www.w3.org/2009/sparql/docs/tests/data-sparql11/subquery/manifest#subquery08").unwrap(),
//We write "2"^^xsd:decimal instead of "2.0"^^xsd:decimal
NamedNode::parse("http://www.w3.org/2009/sparql/docs/tests/data-sparql11/aggregates/manifest#agg-err-02").unwrap(),
NamedNode::parse("http://www.w3.org/2009/sparql/docs/tests/data-sparql11/aggregates/manifest#agg-avg-02").unwrap()
];
let tests: Result<Vec<_>> = manifest_10_urls

Loading…
Cancel
Save