From 899e55324956a133940b7fcd8a5bf79cba5dc614 Mon Sep 17 00:00:00 2001 From: Tpt Date: Sun, 3 Dec 2023 21:12:13 +0100 Subject: [PATCH] Makes SPARQL query result Send and Sync --- lib/sparesults/src/csv.rs | 3 --- lib/sparesults/src/parser.rs | 13 ++++++------- lib/sparesults/src/solution.rs | 7 +++---- lib/src/sparql/eval.rs | 27 +++++++++++++-------------- lib/src/sparql/mod.rs | 30 +++++++++++++++++------------- lib/src/sparql/model.rs | 20 ++++++++++---------- lib/src/sparql/service.rs | 2 +- lib/src/sparql/update.rs | 3 ++- lib/src/storage/numeric_encoder.rs | 8 ++++---- 9 files changed, 56 insertions(+), 57 deletions(-) diff --git a/lib/sparesults/src/csv.rs b/lib/sparesults/src/csv.rs index ac02c99e..985092b4 100644 --- a/lib/sparesults/src/csv.rs +++ b/lib/sparesults/src/csv.rs @@ -628,7 +628,6 @@ mod tests { use super::*; use std::error::Error; - use std::rc::Rc; fn build_example() -> (Vec, Vec>>) { ( @@ -682,7 +681,6 @@ mod tests { let (variables, solutions) = build_example(); let mut buffer = String::new(); let writer = InnerCsvSolutionsWriter::start(&mut buffer, variables.clone()); - let variables = Rc::new(variables); for solution in solutions { writer.write( &mut buffer, @@ -702,7 +700,6 @@ mod tests { // Write let mut buffer = String::new(); let writer = InnerTsvSolutionsWriter::start(&mut buffer, variables.clone()); - let variables = Rc::new(variables); for solution in &solutions { writer.write( &mut buffer, diff --git a/lib/sparesults/src/parser.rs b/lib/sparesults/src/parser.rs index 8833f9ac..b0aad24c 100644 --- a/lib/sparesults/src/parser.rs +++ b/lib/sparesults/src/parser.rs @@ -6,7 +6,7 @@ use crate::solution::QuerySolution; use crate::xml::{XmlQueryResultsReader, XmlSolutionsReader}; use oxrdf::Variable; use std::io::Read; -use std::rc::Rc; +use std::sync::Arc; /// Parsers for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats. /// @@ -81,7 +81,7 @@ impl QueryResultsParser { solutions, variables, } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { - variables: Rc::new(variables), + variables: variables.into(), solutions: SolutionsReaderKind::Xml(solutions), }), }, @@ -91,7 +91,7 @@ impl QueryResultsParser { solutions, variables, } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { - variables: Rc::new(variables), + variables: variables.into(), solutions: SolutionsReaderKind::Json(solutions), }), }, @@ -102,7 +102,7 @@ impl QueryResultsParser { solutions, variables, } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { - variables: Rc::new(variables), + variables: variables.into(), solutions: SolutionsReaderKind::Tsv(solutions), }), }, @@ -166,9 +166,8 @@ pub enum FromReadQueryResultsReader { /// } /// # Result::<(),sparesults::ParseError>::Ok(()) /// ``` -#[allow(clippy::rc_buffer)] pub struct FromReadSolutionsReader { - variables: Rc>, + variables: Arc<[Variable]>, solutions: SolutionsReaderKind, } @@ -209,7 +208,7 @@ impl Iterator for FromReadSolutionsReader { SolutionsReaderKind::Tsv(reader) => reader.read_next(), } .transpose()? - .map(|values| (Rc::clone(&self.variables), values).into()), + .map(|values| (Arc::clone(&self.variables), values).into()), ) } } diff --git a/lib/sparesults/src/solution.rs b/lib/sparesults/src/solution.rs index 842bc7d3..3990ab79 100644 --- a/lib/sparesults/src/solution.rs +++ b/lib/sparesults/src/solution.rs @@ -4,7 +4,7 @@ use oxrdf::{Term, Variable, VariableRef}; use std::fmt; use std::iter::Zip; use std::ops::Index; -use std::rc::Rc; +use std::sync::Arc; /// Tuple associating variables and terms that are the result of a SPARQL query. /// @@ -18,9 +18,8 @@ use std::rc::Rc; /// assert_eq!(solution.get("foo"), Some(&Literal::from(1).into())); // Get the value of the variable ?foo if it exists (here yes). /// assert_eq!(solution.get(1), None); // Get the value of the second column if it exists (here no). /// ``` -#[allow(clippy::rc_buffer)] pub struct QuerySolution { - variables: Rc>, + variables: Arc<[Variable]>, values: Vec>, } @@ -116,7 +115,7 @@ impl QuerySolution { } } -impl>>, S: Into>>> From<(V, S)> for QuerySolution { +impl>, S: Into>>> From<(V, S)> for QuerySolution { #[inline] fn from((v, s): (V, S)) -> Self { Self { diff --git a/lib/src/sparql/eval.rs b/lib/src/sparql/eval.rs index 413fcd20..c5b35a95 100644 --- a/lib/src/sparql/eval.rs +++ b/lib/src/sparql/eval.rs @@ -1,10 +1,11 @@ use crate::model::vocab::{rdf, xsd}; -use crate::model::{BlankNode, LiteralRef, NamedNode, NamedNodeRef, Term, Triple}; +use crate::model::{BlankNode, LiteralRef, NamedNodeRef, Term, Triple}; use crate::sparql::algebra::{Query, QueryDataset}; use crate::sparql::dataset::DatasetView; use crate::sparql::error::EvaluationError; use crate::sparql::model::*; use crate::sparql::service::ServiceHandler; +use crate::sparql::CustomFunctionRegistry; use crate::storage::numeric_encoder::*; use crate::storage::small_string::SmallString; use digest::Digest; @@ -35,6 +36,7 @@ use std::hash::{Hash, Hasher}; use std::iter::Iterator; use std::iter::{empty, once}; use std::rc::Rc; +use std::sync::Arc; use std::{fmt, io, str}; const REGEX_SIZE_LIMIT: usize = 1_000_000; @@ -119,15 +121,14 @@ impl IntoIterator for EncodedTuple { } type EncodedTuplesIterator = Box>>; -type CustomFunctionRegistry = HashMap Option>>; #[derive(Clone)] pub struct SimpleEvaluator { dataset: Rc, base_iri: Option>>, now: DateTime, - service_handler: Rc>, - custom_functions: Rc, + service_handler: Arc>, + custom_functions: Arc, run_stats: bool, } @@ -135,8 +136,8 @@ impl SimpleEvaluator { pub fn new( dataset: Rc, base_iri: Option>>, - service_handler: Rc>, - custom_functions: Rc, + service_handler: Arc>, + custom_functions: Arc, run_stats: bool, ) -> Self { Self { @@ -149,7 +150,6 @@ impl SimpleEvaluator { } } - #[allow(clippy::rc_buffer)] pub fn evaluate_select(&self, pattern: &GraphPattern) -> (QueryResults, Rc) { let mut variables = Vec::new(); let (eval, stats) = self.graph_pattern_evaluator(pattern, &mut variables); @@ -158,7 +158,7 @@ impl SimpleEvaluator { QueryResults::Solutions(decode_bindings( Rc::clone(&self.dataset), eval(from), - Rc::from(variables), + Arc::from(variables), )), stats, ) @@ -3127,11 +3127,10 @@ pub(super) fn compile_pattern(pattern: &str, flags: Option<&str>) -> Option, iter: EncodedTuplesIterator, - variables: Rc>, + variables: Arc<[Variable]>, ) -> QuerySolutionIter { let tuple_size = variables.len(); QuerySolutionIter::new( @@ -3223,15 +3222,15 @@ fn equals(a: &EncodedTerm, b: &EncodedTerm) -> Option { EncodedTerm::FloatLiteral(a) => match b { EncodedTerm::FloatLiteral(b) => Some(a == b), EncodedTerm::DoubleLiteral(b) => Some(Double::from(*a) == *b), - EncodedTerm::IntegerLiteral(b) => Some(*a == Float::from(*b)), + EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()), EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).into()), _ if b.is_unknown_typed_literal() => None, _ => Some(false), }, EncodedTerm::DoubleLiteral(a) => match b { - EncodedTerm::FloatLiteral(b) => Some(*a == Double::from(*b)), + EncodedTerm::FloatLiteral(b) => Some(*a == (*b).into()), EncodedTerm::DoubleLiteral(b) => Some(a == b), - EncodedTerm::IntegerLiteral(b) => Some(*a == Double::from(*b)), + EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()), EncodedTerm::DecimalLiteral(b) => Some(*a == (*b).into()), _ if b.is_unknown_typed_literal() => None, _ => Some(false), @@ -3247,7 +3246,7 @@ fn equals(a: &EncodedTerm, b: &EncodedTerm) -> Option { EncodedTerm::DecimalLiteral(a) => match b { EncodedTerm::FloatLiteral(b) => Some(Float::from(*a) == *b), EncodedTerm::DoubleLiteral(b) => Some(Double::from(*a) == *b), - EncodedTerm::IntegerLiteral(b) => Some(*a == Decimal::from(*b)), + EncodedTerm::IntegerLiteral(b) => Some(*a == (*b).into()), EncodedTerm::DecimalLiteral(b) => Some(a == b), _ if b.is_unknown_typed_literal() => None, _ => Some(false), diff --git a/lib/src/sparql/mod.rs b/lib/src/sparql/mod.rs index 1972a77e..262ab868 100644 --- a/lib/src/sparql/mod.rs +++ b/lib/src/sparql/mod.rs @@ -30,6 +30,7 @@ use sparopt::algebra::GraphPattern; use sparopt::Optimizer; use std::collections::HashMap; use std::rc::Rc; +use std::sync::Arc; use std::time::Duration; use std::{fmt, io}; @@ -56,7 +57,7 @@ pub(crate) fn evaluate_query( Rc::new(dataset), base_iri.map(Rc::new), options.service_handler(), - Rc::new(options.custom_functions), + Arc::new(options.custom_functions), run_stats, ) .evaluate_select(&pattern); @@ -76,7 +77,7 @@ pub(crate) fn evaluate_query( Rc::new(dataset), base_iri.map(Rc::new), options.service_handler(), - Rc::new(options.custom_functions), + Arc::new(options.custom_functions), run_stats, ) .evaluate_ask(&pattern); @@ -99,7 +100,7 @@ pub(crate) fn evaluate_query( Rc::new(dataset), base_iri.map(Rc::new), options.service_handler(), - Rc::new(options.custom_functions), + Arc::new(options.custom_functions), run_stats, ) .evaluate_construct(&pattern, &template); @@ -119,7 +120,7 @@ pub(crate) fn evaluate_query( Rc::new(dataset), base_iri.map(Rc::new), options.service_handler(), - Rc::new(options.custom_functions), + Arc::new(options.custom_functions), run_stats, ) .evaluate_describe(&pattern); @@ -155,19 +156,22 @@ pub(crate) fn evaluate_query( /// ``` #[derive(Clone, Default)] pub struct QueryOptions { - service_handler: Option>>, - custom_functions: HashMap Option>>, + service_handler: Option>>, + custom_functions: CustomFunctionRegistry, http_timeout: Option, http_redirection_limit: usize, without_optimizations: bool, } +pub(crate) type CustomFunctionRegistry = + HashMap Option) + Send + Sync>>; + impl QueryOptions { /// Use a given [`ServiceHandler`] to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls. #[inline] #[must_use] pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self { - self.service_handler = Some(Rc::new(ErrorConversionServiceHandler::wrap( + self.service_handler = Some(Arc::new(ErrorConversionServiceHandler::wrap( service_handler, ))); self @@ -177,7 +181,7 @@ impl QueryOptions { #[inline] #[must_use] pub fn without_service_handler(mut self) -> Self { - self.service_handler = Some(Rc::new(EmptyServiceHandler)); + self.service_handler = Some(Arc::new(EmptyServiceHandler)); self } @@ -227,21 +231,21 @@ impl QueryOptions { pub fn with_custom_function( mut self, name: NamedNode, - evaluator: impl Fn(&[Term]) -> Option + 'static, + evaluator: impl Fn(&[Term]) -> Option + Send + Sync + 'static, ) -> Self { - self.custom_functions.insert(name, Rc::new(evaluator)); + self.custom_functions.insert(name, Arc::new(evaluator)); self } - fn service_handler(&self) -> Rc> { + fn service_handler(&self) -> Arc> { self.service_handler.clone().unwrap_or_else(|| { if cfg!(feature = "http-client") { - Rc::new(service::SimpleServiceHandler::new( + Arc::new(service::SimpleServiceHandler::new( self.http_timeout, self.http_redirection_limit, )) } else { - Rc::new(EmptyServiceHandler) + Arc::new(EmptyServiceHandler) } }) } diff --git a/lib/src/sparql/model.rs b/lib/src/sparql/model.rs index 0c7becc6..81712877 100644 --- a/lib/src/sparql/model.rs +++ b/lib/src/sparql/model.rs @@ -8,7 +8,7 @@ use crate::sparql::results::{ use oxrdf::{Variable, VariableRef}; pub use sparesults::QuerySolution; use std::io::{Read, Write}; -use std::rc::Rc; +use std::sync::Arc; /// Results of a [SPARQL query](https://www.w3.org/TR/sparql11-query/). pub enum QueryResults { @@ -170,22 +170,20 @@ impl From> for QueryResults { /// } /// # Result::<_,Box>::Ok(()) /// ``` -#[allow(clippy::rc_buffer)] pub struct QuerySolutionIter { - variables: Rc>, + variables: Arc<[Variable]>, iter: Box>>, } impl QuerySolutionIter { - #[allow(clippy::rc_buffer)] pub fn new( - variables: Rc>, + variables: Arc<[Variable]>, iter: impl Iterator>, EvaluationError>> + 'static, ) -> Self { Self { - variables: Rc::clone(&variables), + variables: Arc::clone(&variables), iter: Box::new( - iter.map(move |t| t.map(|values| (Rc::clone(&variables), values).into())), + iter.map(move |t| t.map(|values| (Arc::clone(&variables), values).into())), ), } } @@ -211,7 +209,7 @@ impl QuerySolutionIter { impl From> for QuerySolutionIter { fn from(reader: FromReadSolutionsReader) -> Self { Self { - variables: Rc::new(reader.variables().to_vec()), + variables: reader.variables().into(), iter: Box::new(reader.map(|t| t.map_err(EvaluationError::from))), } } @@ -291,10 +289,12 @@ mod tests { QueryResults::Boolean(true), QueryResults::Boolean(false), QueryResults::Solutions(QuerySolutionIter::new( - Rc::new(vec![ + [ Variable::new_unchecked("foo"), Variable::new_unchecked("bar"), - ]), + ] + .as_ref() + .into(), Box::new( vec![ Ok(vec![None, None]), diff --git a/lib/src/sparql/service.rs b/lib/src/sparql/service.rs index dec189ae..2ec0b7a3 100644 --- a/lib/src/sparql/service.rs +++ b/lib/src/sparql/service.rs @@ -48,7 +48,7 @@ use std::time::Duration; /// } /// # Result::<_,Box>::Ok(()) /// ``` -pub trait ServiceHandler { +pub trait ServiceHandler: Send + Sync { type Error: Error + Send + Sync + 'static; /// Evaluates a [`Query`] against a given service identified by a [`NamedNode`]. diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index 1744c464..2e318c71 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -19,6 +19,7 @@ use sparopt::Optimizer; use std::collections::HashMap; use std::io; use std::rc::Rc; +use std::sync::Arc; pub fn evaluate_update<'a, 'b: 'a>( transaction: &'a mut StorageWriter<'b>, @@ -131,7 +132,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> { Rc::clone(&dataset), self.base_iri.clone(), self.options.query_options.service_handler(), - Rc::new(self.options.query_options.custom_functions.clone()), + Arc::new(self.options.query_options.custom_functions.clone()), false, ); let mut variables = Vec::new(); diff --git a/lib/src/storage/numeric_encoder.rs b/lib/src/storage/numeric_encoder.rs index 59f7532f..fd0d7544 100644 --- a/lib/src/storage/numeric_encoder.rs +++ b/lib/src/storage/numeric_encoder.rs @@ -8,8 +8,8 @@ use siphasher::sip128::{Hasher128, SipHasher24}; use std::fmt::Debug; use std::hash::Hash; use std::hash::Hasher; -use std::rc::Rc; use std::str; +use std::sync::Arc; #[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)] #[repr(transparent)] @@ -96,7 +96,7 @@ pub enum EncodedTerm { DurationLiteral(Duration), YearMonthDurationLiteral(YearMonthDuration), DayTimeDurationLiteral(DayTimeDuration), - Triple(Rc), + Triple(Arc), } impl PartialEq for EncodedTerm { @@ -471,7 +471,7 @@ impl From for EncodedTerm { impl From for EncodedTerm { fn from(value: EncodedTriple) -> Self { - Self::Triple(Rc::new(value)) + Self::Triple(Arc::new(value)) } } @@ -634,7 +634,7 @@ impl From> for EncodedTerm { impl From> for EncodedTerm { fn from(triple: TripleRef<'_>) -> Self { - Self::Triple(Rc::new(triple.into())) + Self::Triple(Arc::new(triple.into())) } }