Makes SPARQL query result Send and Sync

pull/689/head
Tpt 1 year ago
parent 03afe5c6c6
commit 9bc04c113e
  1. 1
      lib/oxrdf/src/dataset.rs
  2. 3
      lib/sparesults/src/csv.rs
  3. 13
      lib/sparesults/src/lib.rs
  4. 7
      lib/sparesults/src/solution.rs
  5. 45
      lib/src/sparql/eval.rs
  6. 32
      lib/src/sparql/mod.rs
  7. 17
      lib/src/sparql/model.rs
  8. 12
      lib/src/sparql/plan_builder.rs
  9. 2
      lib/src/sparql/service.rs
  10. 3
      lib/src/sparql/update.rs
  11. 4
      lib/src/storage/backend/mod.rs
  12. 8
      lib/src/storage/numeric_encoder.rs

@ -183,6 +183,7 @@ impl Dataset {
.map(move |q| self.decode_spog(q))
}
#[allow(clippy::map_identity)]
fn interned_quads_for_subject(
&self,
subject: &InternedSubject,

@ -376,7 +376,6 @@ mod tests {
use super::*;
use std::error::Error;
use std::io::Cursor;
use std::rc::Rc;
use std::str;
fn build_example() -> (Vec<Variable>, Vec<Vec<Option<Term>>>) {
@ -430,7 +429,6 @@ mod tests {
fn test_csv_serialization() -> io::Result<()> {
let (variables, solutions) = build_example();
let mut writer = CsvSolutionsWriter::start(Vec::new(), variables.clone())?;
let variables = Rc::new(variables);
for solution in solutions {
writer.write(
variables
@ -450,7 +448,6 @@ mod tests {
// Write
let mut writer = TsvSolutionsWriter::start(Vec::new(), variables.clone())?;
let variables = Rc::new(variables);
for solution in &solutions {
writer.write(
variables

@ -17,7 +17,7 @@ pub use crate::solution::QuerySolution;
use crate::xml::*;
use oxrdf::{TermRef, Variable, VariableRef};
use std::io::{self, BufRead, Write};
use std::rc::Rc;
use std::sync::Arc;
/// [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats.
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
@ -198,7 +198,7 @@ impl QueryResultsParser {
solutions,
variables,
} => QueryResultsReader::Solutions(SolutionsReader {
variables: Rc::new(variables),
variables: variables.into(),
solutions: SolutionsReaderKind::Xml(solutions),
}),
},
@ -208,7 +208,7 @@ impl QueryResultsParser {
solutions,
variables,
} => QueryResultsReader::Solutions(SolutionsReader {
variables: Rc::new(variables),
variables: variables.into(),
solutions: SolutionsReaderKind::Json(solutions),
}),
},
@ -219,7 +219,7 @@ impl QueryResultsParser {
solutions,
variables,
} => QueryResultsReader::Solutions(SolutionsReader {
variables: Rc::new(variables),
variables: variables.into(),
solutions: SolutionsReaderKind::Tsv(solutions),
}),
},
@ -275,9 +275,8 @@ pub enum QueryResultsReader<R: BufRead> {
/// }
/// # Result::<(),sparesults::ParseError>::Ok(())
/// ```
#[allow(clippy::rc_buffer)]
pub struct SolutionsReader<R: BufRead> {
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
solutions: SolutionsReaderKind<R>,
}
@ -318,7 +317,7 @@ impl<R: BufRead> Iterator for SolutionsReader<R> {
SolutionsReaderKind::Tsv(reader) => reader.read_next(),
}
.transpose()?
.map(|values| (Rc::clone(&self.variables), values).into()),
.map(|values| (Arc::clone(&self.variables), values).into()),
)
}
}

@ -4,7 +4,7 @@ use oxrdf::{Term, Variable, VariableRef};
use std::fmt;
use std::iter::Zip;
use std::ops::Index;
use std::rc::Rc;
use std::sync::Arc;
/// Tuple associating variables and terms that are the result of a SPARQL query.
///
@ -18,9 +18,8 @@ use std::rc::Rc;
/// assert_eq!(solution.get("foo"), Some(&Literal::from(1).into())); // Get the value of the variable ?foo if it exists (here yes).
/// assert_eq!(solution.get(1), None); // Get the value of the second column if it exists (here no).
/// ```
#[allow(clippy::rc_buffer)]
pub struct QuerySolution {
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
values: Vec<Option<Term>>,
}
@ -116,7 +115,7 @@ impl QuerySolution {
}
}
impl<V: Into<Rc<Vec<Variable>>>, S: Into<Vec<Option<Term>>>> From<(V, S)> for QuerySolution {
impl<V: Into<Arc<[Variable]>>, S: Into<Vec<Option<Term>>>> From<(V, S)> for QuerySolution {
#[inline]
fn from((v, s): (V, S)) -> Self {
Self {

@ -1,11 +1,12 @@
use crate::model::vocab::{rdf, xsd};
use crate::model::{BlankNode, LiteralRef, NamedNode, NamedNodeRef, Term, Triple};
use crate::model::{BlankNode, LiteralRef, NamedNodeRef, Term, Triple};
use crate::sparql::algebra::{Query, QueryDataset};
use crate::sparql::dataset::DatasetView;
use crate::sparql::error::EvaluationError;
use crate::sparql::model::*;
use crate::sparql::plan::*;
use crate::sparql::service::ServiceHandler;
use crate::sparql::CustomFunctionRegistry;
use crate::storage::numeric_encoder::*;
use crate::storage::small_string::SmallString;
use digest::Digest;
@ -28,6 +29,7 @@ use std::iter::Iterator;
use std::iter::{empty, once};
use std::rc::Rc;
use std::str;
use std::sync::Arc;
use std::time::Duration as StdDuration;
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
use std::time::Instant;
@ -35,15 +37,14 @@ use std::time::Instant;
const REGEX_SIZE_LIMIT: usize = 1_000_000;
type EncodedTuplesIterator = Box<dyn Iterator<Item = Result<EncodedTuple, EvaluationError>>>;
type CustomFunctionRegistry = HashMap<NamedNode, Rc<dyn Fn(&[Term]) -> Option<Term>>>;
#[derive(Clone)]
pub struct SimpleEvaluator {
dataset: Rc<DatasetView>,
base_iri: Option<Rc<Iri<String>>>,
now: DateTime,
service_handler: Rc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Rc<CustomFunctionRegistry>,
service_handler: Arc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Arc<CustomFunctionRegistry>,
run_stats: bool,
}
@ -51,8 +52,8 @@ impl SimpleEvaluator {
pub fn new(
dataset: Rc<DatasetView>,
base_iri: Option<Rc<Iri<String>>>,
service_handler: Rc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Rc<CustomFunctionRegistry>,
service_handler: Arc<dyn ServiceHandler<Error = EvaluationError>>,
custom_functions: Arc<CustomFunctionRegistry>,
run_stats: bool,
) -> Self {
Self {
@ -69,7 +70,7 @@ impl SimpleEvaluator {
pub fn evaluate_select_plan(
&self,
plan: Rc<PlanNode>,
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
) -> (QueryResults, Rc<PlanNodeWithStats>) {
let (eval, stats) = self.plan_evaluator(plan);
(
@ -1982,9 +1983,7 @@ impl SimpleEvaluator {
EncodedTerm::DoubleLiteral(value) => {
Some(Decimal::try_from(value).ok()?.into())
}
EncodedTerm::IntegerLiteral(value) => {
Some(Decimal::try_from(value).ok()?.into())
}
EncodedTerm::IntegerLiteral(value) => Some(Decimal::from(value).into()),
EncodedTerm::DecimalLiteral(value) => Some(value.into()),
EncodedTerm::BooleanLiteral(value) => Some(Decimal::from(value).into()),
EncodedTerm::SmallStringLiteral(value) => parse_decimal_str(&value),
@ -2359,7 +2358,7 @@ pub(super) fn compile_pattern(pattern: &str, flags: Option<&str>) -> Option<Rege
fn decode_bindings(
dataset: Rc<DatasetView>,
iter: EncodedTuplesIterator,
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
) -> QuerySolutionIter {
let tuple_size = variables.len();
QuerySolutionIter::new(
@ -2451,16 +2450,16 @@ fn equals(a: &EncodedTerm, b: &EncodedTerm) -> Option<bool> {
EncodedTerm::FloatLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => Some(a == b),
EncodedTerm::DoubleLiteral(b) => Some(Double::from(*a) == *b),
EncodedTerm::IntegerLiteral(b) => Some(*a == Float::from(*b)),
EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).try_into().ok()?),
EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).into()),
_ if b.is_unknown_typed_literal() => None,
_ => Some(false),
},
EncodedTerm::DoubleLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => Some(*a == Double::from(*b)),
EncodedTerm::FloatLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DoubleLiteral(b) => Some(a == b),
EncodedTerm::IntegerLiteral(b) => Some(*a == Double::from(*b)),
EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).try_into().ok()?),
EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).into()),
_ if b.is_unknown_typed_literal() => None,
_ => Some(false),
},
@ -2473,9 +2472,9 @@ fn equals(a: &EncodedTerm, b: &EncodedTerm) -> Option<bool> {
_ => Some(false),
},
EncodedTerm::DecimalLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => Some(Float::try_from(*a).ok()? == *b),
EncodedTerm::DoubleLiteral(b) => Some(Double::try_from(*a).ok()? == *b),
EncodedTerm::IntegerLiteral(b) => Some(*a == Decimal::from(*b)),
EncodedTerm::FloatLiteral(b) => Some(Float::from(*a) == *b),
EncodedTerm::DoubleLiteral(b) => Some(Double::from(*a) == *b),
EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()),
EncodedTerm::DecimalLiteral(b) => Some(a == b),
_ if b.is_unknown_typed_literal() => None,
_ => Some(false),
@ -2739,14 +2738,14 @@ fn partial_cmp_literals(
EncodedTerm::FloatLiteral(b) => a.partial_cmp(b),
EncodedTerm::DoubleLiteral(b) => Double::from(*a).partial_cmp(b),
EncodedTerm::IntegerLiteral(b) => a.partial_cmp(&Float::from(*b)),
EncodedTerm::DecimalLiteral(b) => a.partial_cmp(&(*b).try_into().ok()?),
EncodedTerm::DecimalLiteral(b) => a.partial_cmp(&(*b).into()),
_ => None,
},
EncodedTerm::DoubleLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => a.partial_cmp(&(*b).into()),
EncodedTerm::DoubleLiteral(b) => a.partial_cmp(b),
EncodedTerm::IntegerLiteral(b) => a.partial_cmp(&Double::from(*b)),
EncodedTerm::DecimalLiteral(b) => a.partial_cmp(&(*b).try_into().ok()?),
EncodedTerm::DecimalLiteral(b) => a.partial_cmp(&(*b).into()),
_ => None,
},
EncodedTerm::IntegerLiteral(a) => match b {
@ -2757,8 +2756,8 @@ fn partial_cmp_literals(
_ => None,
},
EncodedTerm::DecimalLiteral(a) => match b {
EncodedTerm::FloatLiteral(b) => Float::try_from(*a).ok()?.partial_cmp(b),
EncodedTerm::DoubleLiteral(b) => Double::try_from(*a).ok()?.partial_cmp(b),
EncodedTerm::FloatLiteral(b) => Float::from(*a).partial_cmp(b),
EncodedTerm::DoubleLiteral(b) => Double::from(*a).partial_cmp(b),
EncodedTerm::IntegerLiteral(b) => a.partial_cmp(&Decimal::from(*b)),
EncodedTerm::DecimalLiteral(b) => a.partial_cmp(b),
_ => None,

@ -31,6 +31,7 @@ pub use sparesults::QueryResultsFormat;
pub use spargebra::ParseError;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use std::{fmt, io};
@ -60,10 +61,10 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_select_plan(Rc::new(plan), Rc::new(variables));
.evaluate_select_plan(Rc::new(plan), variables.into());
(Ok(results), explanation, planning_duration)
}
spargebra::Query::Ask {
@ -81,7 +82,7 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_ask_plan(Rc::new(plan));
@ -112,7 +113,7 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_construct_plan(Rc::new(plan), construct);
@ -133,7 +134,7 @@ pub(crate) fn evaluate_query(
Rc::new(dataset),
base_iri.map(Rc::new),
options.service_handler(),
Rc::new(options.custom_functions),
Arc::new(options.custom_functions),
run_stats,
)
.evaluate_describe_plan(Rc::new(plan));
@ -169,19 +170,22 @@ pub(crate) fn evaluate_query(
/// ```
#[derive(Clone, Default)]
pub struct QueryOptions {
service_handler: Option<Rc<dyn ServiceHandler<Error = EvaluationError>>>,
custom_functions: HashMap<NamedNode, Rc<dyn Fn(&[Term]) -> Option<Term>>>,
service_handler: Option<Arc<dyn ServiceHandler<Error = EvaluationError>>>,
custom_functions: CustomFunctionRegistry,
http_timeout: Option<Duration>,
http_redirection_limit: usize,
without_optimizations: bool,
}
pub(crate) type CustomFunctionRegistry =
HashMap<NamedNode, Arc<dyn (Fn(&[Term]) -> Option<Term>) + Send + Sync>>;
impl QueryOptions {
/// Use a given [`ServiceHandler`] to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls.
#[inline]
#[must_use]
pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self {
self.service_handler = Some(Rc::new(ErrorConversionServiceHandler::wrap(
self.service_handler = Some(Arc::new(ErrorConversionServiceHandler::wrap(
service_handler,
)));
self
@ -191,7 +195,7 @@ impl QueryOptions {
#[inline]
#[must_use]
pub fn without_service_handler(mut self) -> Self {
self.service_handler = Some(Rc::new(EmptyServiceHandler));
self.service_handler = Some(Arc::new(EmptyServiceHandler));
self
}
@ -241,21 +245,21 @@ impl QueryOptions {
pub fn with_custom_function(
mut self,
name: NamedNode,
evaluator: impl Fn(&[Term]) -> Option<Term> + 'static,
evaluator: impl Fn(&[Term]) -> Option<Term> + Send + Sync + 'static,
) -> Self {
self.custom_functions.insert(name, Rc::new(evaluator));
self.custom_functions.insert(name, Arc::new(evaluator));
self
}
fn service_handler(&self) -> Rc<dyn ServiceHandler<Error = EvaluationError>> {
fn service_handler(&self) -> Arc<dyn ServiceHandler<Error = EvaluationError>> {
self.service_handler.clone().unwrap_or_else(|| {
if cfg!(feature = "http_client") {
Rc::new(service::SimpleServiceHandler::new(
Arc::new(service::SimpleServiceHandler::new(
self.http_timeout,
self.http_redirection_limit,
))
} else {
Rc::new(EmptyServiceHandler)
Arc::new(EmptyServiceHandler)
}
})
}

@ -9,7 +9,7 @@ use sparesults::{
SolutionsReader,
};
use std::io::{BufRead, Write};
use std::rc::Rc;
use std::sync::Arc;
/// Results of a [SPARQL query](https://www.w3.org/TR/sparql11-query/).
pub enum QueryResults {
@ -162,19 +162,19 @@ impl<R: BufRead + 'static> From<QueryResultsReader<R>> for QueryResults {
/// ```
#[allow(clippy::rc_buffer)]
pub struct QuerySolutionIter {
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
iter: Box<dyn Iterator<Item = Result<QuerySolution, EvaluationError>>>,
}
impl QuerySolutionIter {
pub fn new(
variables: Rc<Vec<Variable>>,
variables: Arc<[Variable]>,
iter: impl Iterator<Item = Result<Vec<Option<Term>>, EvaluationError>> + 'static,
) -> Self {
Self {
variables: Rc::clone(&variables),
variables: Arc::clone(&variables),
iter: Box::new(
iter.map(move |t| t.map(|values| (Rc::clone(&variables), values).into())),
iter.map(move |t| t.map(|values| (Arc::clone(&variables), values).into())),
),
}
}
@ -200,7 +200,7 @@ impl QuerySolutionIter {
impl<R: BufRead + 'static> From<SolutionsReader<R>> for QuerySolutionIter {
fn from(reader: SolutionsReader<R>) -> Self {
Self {
variables: Rc::new(reader.variables().to_vec()),
variables: reader.variables().into(),
iter: Box::new(reader.map(|t| t.map_err(EvaluationError::from))),
}
}
@ -274,10 +274,11 @@ fn test_serialization_roundtrip() -> Result<(), EvaluationError> {
QueryResults::Boolean(true),
QueryResults::Boolean(false),
QueryResults::Solutions(QuerySolutionIter::new(
Rc::new(vec![
[
Variable::new_unchecked("foo"),
Variable::new_unchecked("bar"),
]),
]
.into(),
Box::new(
vec![
Ok(vec![None, None]),

@ -1,8 +1,8 @@
use crate::model::Term as OxTerm;
use crate::sparql::dataset::DatasetView;
use crate::sparql::error::EvaluationError;
use crate::sparql::eval::compile_pattern;
use crate::sparql::plan::*;
use crate::sparql::CustomFunctionRegistry;
use crate::storage::numeric_encoder::{EncodedTerm, EncodedTriple};
use oxrdf::vocab::xsd;
use oxrdf::TermRef;
@ -10,13 +10,13 @@ use rand::random;
use regex::Regex;
use spargebra::algebra::*;
use spargebra::term::*;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashSet};
use std::mem::swap;
use std::rc::Rc;
pub struct PlanBuilder<'a> {
dataset: &'a DatasetView,
custom_functions: &'a HashMap<NamedNode, Rc<dyn Fn(&[OxTerm]) -> Option<OxTerm>>>,
custom_functions: &'a CustomFunctionRegistry,
with_optimizations: bool,
}
@ -25,7 +25,7 @@ impl<'a> PlanBuilder<'a> {
dataset: &'a DatasetView,
pattern: &GraphPattern,
is_cardinality_meaningful: bool,
custom_functions: &'a HashMap<NamedNode, Rc<dyn Fn(&[OxTerm]) -> Option<OxTerm>>>,
custom_functions: &'a CustomFunctionRegistry,
without_optimizations: bool,
) -> Result<(PlanNode, Vec<Variable>), EvaluationError> {
let mut variables = Vec::default();
@ -58,7 +58,7 @@ impl<'a> PlanBuilder<'a> {
dataset: &'a DatasetView,
template: &[TriplePattern],
mut variables: Vec<Variable>,
custom_functions: &'a HashMap<NamedNode, Rc<dyn Fn(&[OxTerm]) -> Option<OxTerm>>>,
custom_functions: &'a CustomFunctionRegistry,
without_optimizations: bool,
) -> Vec<TripleTemplate> {
PlanBuilder {
@ -507,7 +507,7 @@ impl<'a> PlanBuilder<'a> {
variables,
graph_name,
)?)),
Function::BNode => PlanExpression::BNode(match parameters.get(0) {
Function::BNode => PlanExpression::BNode(match parameters.first() {
Some(e) => Some(Box::new(
self.build_for_expression(e, variables, graph_name)?,
)),

@ -49,7 +49,7 @@ use std::time::Duration;
/// }
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub trait ServiceHandler {
pub trait ServiceHandler: Send + Sync {
type Error: Error + Send + Sync + 'static;
/// Evaluates a [`Query`] against a given service identified by a [`NamedNode`].

@ -21,6 +21,7 @@ use spargebra::GraphUpdateOperation;
use std::collections::HashMap;
use std::io::BufReader;
use std::rc::Rc;
use std::sync::Arc;
pub fn evaluate_update<'a, 'b: 'a>(
transaction: &'a mut StorageWriter<'b>,
@ -136,7 +137,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> {
Rc::clone(&dataset),
self.base_iri.clone(),
self.options.query_options.service_handler(),
Rc::new(self.options.query_options.custom_functions.clone()),
Arc::new(self.options.query_options.custom_functions.clone()),
false,
);
let mut bnodes = HashMap::new();

@ -4,9 +4,7 @@
#[cfg(target_family = "wasm")]
pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction};
#[cfg(not(target_family = "wasm"))]
pub use rocksdb::{
ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, SstFileWriter, Transaction,
};
pub use rocksdb::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction};
#[cfg(target_family = "wasm")]
mod fallback;

@ -8,8 +8,8 @@ use siphasher::sip128::{Hasher128, SipHasher24};
use std::fmt::Debug;
use std::hash::Hash;
use std::hash::Hasher;
use std::rc::Rc;
use std::str;
use std::sync::Arc;
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
#[repr(transparent)]
@ -96,7 +96,7 @@ pub enum EncodedTerm {
DurationLiteral(Duration),
YearMonthDurationLiteral(YearMonthDuration),
DayTimeDurationLiteral(DayTimeDuration),
Triple(Rc<EncodedTriple>),
Triple(Arc<EncodedTriple>),
}
impl PartialEq for EncodedTerm {
@ -471,7 +471,7 @@ impl From<DayTimeDuration> for EncodedTerm {
impl From<EncodedTriple> for EncodedTerm {
fn from(value: EncodedTriple) -> Self {
Self::Triple(Rc::new(value))
Self::Triple(Arc::new(value))
}
}
@ -634,7 +634,7 @@ impl From<GraphNameRef<'_>> for EncodedTerm {
impl From<TripleRef<'_>> for EncodedTerm {
fn from(triple: TripleRef<'_>) -> Self {
Self::Triple(Rc::new(triple.into()))
Self::Triple(Arc::new(triple.into()))
}
}

Loading…
Cancel
Save