Makes SPARQL query result Send and Sync

pull/692/head
Tpt 10 months ago committed by Thomas Tanon
parent 4f404ab650
commit 899e553249
  1. 3
      lib/sparesults/src/csv.rs
  2. 13
      lib/sparesults/src/parser.rs
  3. 7
      lib/sparesults/src/solution.rs
  4. 27
      lib/src/sparql/eval.rs
  5. 30
      lib/src/sparql/mod.rs
  6. 20
      lib/src/sparql/model.rs
  7. 2
      lib/src/sparql/service.rs
  8. 3
      lib/src/sparql/update.rs
  9. 8
      lib/src/storage/numeric_encoder.rs

@ -628,7 +628,6 @@ mod tests {
use super::*;
use std::error::Error;
use std::rc::Rc;
fn build_example() -> (Vec<Variable>, Vec<Vec<Option<Term>>>) {
(
@ -682,7 +681,6 @@ mod tests {
let (variables, solutions) = build_example();
let mut buffer = String::new();
let writer = InnerCsvSolutionsWriter::start(&mut buffer, variables.clone());
let variables = Rc::new(variables);
for solution in solutions {
writer.write(
&mut buffer,
@ -702,7 +700,6 @@ mod tests {
// Write
let mut buffer = String::new();
let writer = InnerTsvSolutionsWriter::start(&mut buffer, variables.clone());
let variables = Rc::new(variables);
for solution in &solutions {
writer.write(
&mut buffer,

@ -6,7 +6,7 @@ use crate::solution::QuerySolution;
use crate::xml::{XmlQueryResultsReader, XmlSolutionsReader};
use oxrdf::Variable;
use std::io::Read;
use std::rc::Rc;
use std::sync::Arc;
/// Parsers for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats.
///
@ -81,7 +81,7 @@ impl QueryResultsParser {
solutions,
variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
variables: Rc::new(variables),
variables: variables.into(),
solutions: SolutionsReaderKind::Xml(solutions),
}),
},
@ -91,7 +91,7 @@ impl QueryResultsParser {
solutions,
variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
variables: Rc::new(variables),
variables: variables.into(),
solutions: SolutionsReaderKind::Json(solutions),
}),
},
@ -102,7 +102,7 @@ impl QueryResultsParser {
solutions,
variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
variables: Rc::new(variables),
variables: variables.into(),
solutions: SolutionsReaderKind::Tsv(solutions),
}),
},
@ -166,9 +166,8 @@ pub enum FromReadQueryResultsReader<R: Read> {
/// }
/// # Result::<(),sparesults::ParseError>::Ok(())
/// ```
#[allow(clippy::rc_buffer)]
pub struct FromReadSolutionsReader<R: Read> {
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
solutions: SolutionsReaderKind<R>,
}
@ -209,7 +208,7 @@ impl<R: Read> Iterator for FromReadSolutionsReader<R> {
SolutionsReaderKind::Tsv(reader) => reader.read_next(),
}
.transpose()?
.map(|values| (Rc::clone(&self.variables), values).into()),
.map(|values| (Arc::clone(&self.variables), values).into()),
)
}
}

@ -4,7 +4,7 @@ use oxrdf::{Term, Variable, VariableRef};
use std::fmt;
use std::iter::Zip;
use std::ops::Index;
use std::rc::Rc;
use std::sync::Arc;
/// Tuple associating variables and terms that are the result of a SPARQL query.
///
@ -18,9 +18,8 @@ use std::rc::Rc;
/// assert_eq!(solution.get("foo"), Some(&Literal::from(1).into())); // Get the value of the variable ?foo if it exists (here yes).
/// assert_eq!(solution.get(1), None); // Get the value of the second column if it exists (here no).
/// ```
#[allow(clippy::rc_buffer)]
pub struct QuerySolution {
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
values: Vec<Option<Term>>,
}
@ -116,7 +115,7 @@ impl QuerySolution {
}
}
impl<V: Into<Rc<Vec<Variable>>>, S: Into<Vec<Option<Term>>>> From<(V, S)> for QuerySolution {
impl<V: Into<Arc<[Variable]>>, S: Into<Vec<Option<Term>>>> From<(V, S)> for QuerySolution {
#[inline]
fn from((v, s): (V, S)) -> Self {
Self {

@ -1,10 +1,11 @@
use crate::model::vocab::{rdf, xsd};
use crate::model::{BlankNode, LiteralRef, NamedNode, NamedNodeRef, Term, Triple};
use crate::model::{BlankNode, LiteralRef, NamedNodeRef, Term, Triple};
use crate::sparql::algebra::{Query, QueryDataset};
use crate::sparql::dataset::DatasetView;
use crate::sparql::error::EvaluationError;
use crate::sparql::model::*;
use crate::sparql::service::ServiceHandler;
use crate::sparql::CustomFunctionRegistry;
use crate::storage::numeric_encoder::*;
use crate::storage::small_string::SmallString;
use digest::Digest;
@ -35,6 +36,7 @@ use std::hash::{Hash, Hasher};
use std::iter::Iterator;
use std::iter::{empty, once};
use std::rc::Rc;
use std::sync::Arc;
use std::{fmt, io, str};
const REGEX_SIZE_LIMIT: usize = 1_000_000;
@ -119,15 +121,14 @@ impl IntoIterator for EncodedTuple {
}
type EncodedTuplesIterator = Box<dyn Iterator<Item = Result<EncodedTuple, EvaluationError>>>;
type CustomFunctionRegistry = HashMap<NamedNode, Rc<dyn Fn(&[Term]) -> Option<Term>>>;
#[derive(Clone)]
pub struct SimpleEvaluator {
dataset: Rc<DatasetView>,
base_iri: Option<Rc<Iri<String>>>,
now: DateTime,
service_handler: Rc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Rc<CustomFunctionRegistry>,
service_handler: Arc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Arc<CustomFunctionRegistry>,
run_stats: bool,
}
@ -135,8 +136,8 @@ impl SimpleEvaluator {
pub fn new(
dataset: Rc<DatasetView>,
base_iri: Option<Rc<Iri<String>>>,
service_handler: Rc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Rc<CustomFunctionRegistry>,
service_handler: Arc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Arc<CustomFunctionRegistry>,
run_stats: bool,
) -> Self {
Self {
@ -149,7 +150,6 @@ impl SimpleEvaluator {
}
}
#[allow(clippy::rc_buffer)]
pub fn evaluate_select(&self, pattern: &GraphPattern) -> (QueryResults, Rc<EvalNodeWithStats>) {
let mut variables = Vec::new();
let (eval, stats) = self.graph_pattern_evaluator(pattern, &mut variables);
@ -158,7 +158,7 @@ impl SimpleEvaluator {
QueryResults::Solutions(decode_bindings(
Rc::clone(&self.dataset),
eval(from),
Rc::from(variables),
Arc::from(variables),
)),
stats,
)
@ -3127,11 +3127,10 @@ pub(super) fn compile_pattern(pattern: &str, flags: Option<&str>) -> Option<Rege
regex_builder.build().ok()
}
#[allow(clippy::rc_buffer)]
fn decode_bindings(
dataset: Rc<DatasetView>,
iter: EncodedTuplesIterator,
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
) -> QuerySolutionIter {
let tuple_size = variables.len();
QuerySolutionIter::new(
@ -3223,15 +3222,15 @@ fn equals(a: &EncodedTerm, b: &EncodedTerm) -> Option<bool> {
EncodedTerm::FloatLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => Some(a == b),
EncodedTerm::DoubleLiteral(b) => Some(Double::from(*a) == *b),
EncodedTerm::IntegerLiteral(b) => Some(*a == Float::from(*b)),
EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).into()),
_ if b.is_unknown_typed_literal() => None,
_ => Some(false),
},
EncodedTerm::DoubleLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => Some(*a == Double::from(*b)),
EncodedTerm::FloatLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DoubleLiteral(b) => Some(a == b),
EncodedTerm::IntegerLiteral(b) => Some(*a == Double::from(*b)),
EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).into()),
_ if b.is_unknown_typed_literal() => None,
_ => Some(false),
@ -3247,7 +3246,7 @@ fn equals(a: &EncodedTerm, b: &EncodedTerm) -> Option<bool> {
EncodedTerm::DecimalLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => Some(Float::from(*a) == *b),
EncodedTerm::DoubleLiteral(b) => Some(Double::from(*a) == *b),
EncodedTerm::IntegerLiteral(b) => Some(*a == Decimal::from(*b)),
EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DecimalLiteral(b) => Some(a == b),
_ if b.is_unknown_typed_literal() => None,
_ => Some(false),

@ -30,6 +30,7 @@ use sparopt::algebra::GraphPattern;
use sparopt::Optimizer;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};
@ -56,7 +57,7 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_select(&pattern);
@ -76,7 +77,7 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_ask(&pattern);
@ -99,7 +100,7 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_construct(&pattern, &template);
@ -119,7 +120,7 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_describe(&pattern);
@ -155,19 +156,22 @@ pub(crate) fn evaluate_query(
/// ```
#[derive(Clone, Default)]
pub struct QueryOptions {
service_handler: Option<Rc<dyn ServiceHandler<Error = EvaluationError>>>,
custom_functions: HashMap<NamedNode, Rc<dyn Fn(&[Term]) -> Option<Term>>>,
service_handler: Option<Arc<dyn ServiceHandler<Error = EvaluationError>>>,
custom_functions: CustomFunctionRegistry,
http_timeout: Option<Duration>,
http_redirection_limit: usize,
without_optimizations: bool,
}
pub(crate) type CustomFunctionRegistry =
HashMap<NamedNode, Arc<dyn (Fn(&[Term]) -> Option<Term>) + Send + Sync>>;
impl QueryOptions {
/// Use a given [`ServiceHandler`] to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls.
#[inline]
#[must_use]
pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self {
self.service_handler = Some(Rc::new(ErrorConversionServiceHandler::wrap(
self.service_handler = Some(Arc::new(ErrorConversionServiceHandler::wrap(
service_handler,
)));
self
@ -177,7 +181,7 @@ impl QueryOptions {
#[inline]
#[must_use]
pub fn without_service_handler(mut self) -> Self {
self.service_handler = Some(Rc::new(EmptyServiceHandler));
self.service_handler = Some(Arc::new(EmptyServiceHandler));
self
}
@ -227,21 +231,21 @@ impl QueryOptions {
pub fn with_custom_function(
mut self,
name: NamedNode,
evaluator: impl Fn(&[Term]) -> Option<Term> + 'static,
evaluator: impl Fn(&[Term]) -> Option<Term> + Send + Sync + 'static,
) -> Self {
self.custom_functions.insert(name, Rc::new(evaluator));
self.custom_functions.insert(name, Arc::new(evaluator));
self
}
fn service_handler(&self) -> Rc<dyn ServiceHandler<Error = EvaluationError>> {
fn service_handler(&self) -> Arc<dyn ServiceHandler<Error = EvaluationError>> {
self.service_handler.clone().unwrap_or_else(|| {
if cfg!(feature = "http-client") {
Rc::new(service::SimpleServiceHandler::new(
Arc::new(service::SimpleServiceHandler::new(
self.http_timeout,
self.http_redirection_limit,
))
} else {
Rc::new(EmptyServiceHandler)
Arc::new(EmptyServiceHandler)
}
})
}

@ -8,7 +8,7 @@ use crate::sparql::results::{
use oxrdf::{Variable, VariableRef};
pub use sparesults::QuerySolution;
use std::io::{Read, Write};
use std::rc::Rc;
use std::sync::Arc;
/// Results of a [SPARQL query](https://www.w3.org/TR/sparql11-query/).
pub enum QueryResults {
@ -170,22 +170,20 @@ impl<R: Read + 'static> From<FromReadQueryResultsReader<R>> for QueryResults {
/// }
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
#[allow(clippy::rc_buffer)]
pub struct QuerySolutionIter {
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
iter: Box<dyn Iterator<Item = Result<QuerySolution, EvaluationError>>>,
}
impl QuerySolutionIter {
#[allow(clippy::rc_buffer)]
pub fn new(
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
iter: impl Iterator<Item = Result<Vec<Option<Term>>, EvaluationError>> + 'static,
) -> Self {
Self {
variables: Rc::clone(&variables),
variables: Arc::clone(&variables),
iter: Box::new(
iter.map(move |t| t.map(|values| (Rc::clone(&variables), values).into())),
iter.map(move |t| t.map(|values| (Arc::clone(&variables), values).into())),
),
}
}
@ -211,7 +209,7 @@ impl QuerySolutionIter {
impl<R: Read + 'static> From<FromReadSolutionsReader<R>> for QuerySolutionIter {
fn from(reader: FromReadSolutionsReader<R>) -> Self {
Self {
variables: Rc::new(reader.variables().to_vec()),
variables: reader.variables().into(),
iter: Box::new(reader.map(|t| t.map_err(EvaluationError::from))),
}
}
@ -291,10 +289,12 @@ mod tests {
QueryResults::Boolean(true),
QueryResults::Boolean(false),
QueryResults::Solutions(QuerySolutionIter::new(
Rc::new(vec![
[
Variable::new_unchecked("foo"),
Variable::new_unchecked("bar"),
]),
]
.as_ref()
.into(),
Box::new(
vec![
Ok(vec![None, None]),

@ -48,7 +48,7 @@ use std::time::Duration;
/// }
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub trait ServiceHandler {
pub trait ServiceHandler: Send + Sync {
type Error: Error + Send + Sync + 'static;
/// Evaluates a [`Query`] against a given service identified by a [`NamedNode`].

@ -19,6 +19,7 @@ use sparopt::Optimizer;
use std::collections::HashMap;
use std::io;
use std::rc::Rc;
use std::sync::Arc;
pub fn evaluate_update<'a, 'b: 'a>(
transaction: &'a mut StorageWriter<'b>,
@ -131,7 +132,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> {
Rc::clone(&dataset),
self.base_iri.clone(),
self.options.query_options.service_handler(),
Rc::new(self.options.query_options.custom_functions.clone()),
Arc::new(self.options.query_options.custom_functions.clone()),
false,
);
let mut variables = Vec::new();

@ -8,8 +8,8 @@ use siphasher::sip128::{Hasher128, SipHasher24};
use std::fmt::Debug;
use std::hash::Hash;
use std::hash::Hasher;
use std::rc::Rc;
use std::str;
use std::sync::Arc;
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
#[repr(transparent)]
@ -96,7 +96,7 @@ pub enum EncodedTerm {
DurationLiteral(Duration),
YearMonthDurationLiteral(YearMonthDuration),
DayTimeDurationLiteral(DayTimeDuration),
Triple(Rc<EncodedTriple>),
Triple(Arc<EncodedTriple>),
}
impl PartialEq for EncodedTerm {
@ -471,7 +471,7 @@ impl From<DayTimeDuration> for EncodedTerm {
impl From<EncodedTriple> for EncodedTerm {
fn from(value: EncodedTriple) -> Self {
Self::Triple(Rc::new(value))
Self::Triple(Arc::new(value))
}
}
@ -634,7 +634,7 @@ impl From<GraphNameRef<'_>> for EncodedTerm {
impl From<TripleRef<'_>> for EncodedTerm {
fn from(triple: TripleRef<'_>) -> Self {
Self::Triple(Rc::new(triple.into()))
Self::Triple(Arc::new(triple.into()))
}
}

Loading…
Cancel
Save