Adds an async SPARQL JSON results reader

pull/779/merge
Tpt 9 months ago committed by Thomas Tanon
parent c277804026
commit e48b268fc5
  1. 1238
      lib/sparesults/src/json.rs
  2. 4
      lib/sparesults/src/lib.rs
  3. 234
      lib/sparesults/src/parser.rs

File diff suppressed because it is too large Load Diff

@ -16,5 +16,9 @@ mod xml;
pub use crate::error::{QueryResultsParseError, QueryResultsSyntaxError, TextPosition};
pub use crate::format::QueryResultsFormat;
pub use crate::parser::{FromReadQueryResultsReader, FromReadSolutionsReader, QueryResultsParser};
#[cfg(feature = "async-tokio")]
pub use crate::parser::{FromTokioAsyncReadQueryResultsReader, FromTokioAsyncReadSolutionsReader};
#[cfg(feature = "async-tokio")]
pub use crate::serializer::ToTokioAsyncWriteSolutionsWriter;
pub use crate::serializer::{QueryResultsSerializer, ToWriteSolutionsWriter};
pub use crate::solution::QuerySolution;

@ -1,12 +1,18 @@
use crate::csv::{TsvQueryResultsReader, TsvSolutionsReader};
use crate::error::{QueryResultsParseError, QueryResultsSyntaxError};
use crate::format::QueryResultsFormat;
use crate::json::{JsonQueryResultsReader, JsonSolutionsReader};
use crate::json::{FromReadJsonQueryResultsReader, FromReadJsonSolutionsReader};
#[cfg(feature = "async-tokio")]
use crate::json::{
FromTokioAsyncReadJsonQueryResultsReader, FromTokioAsyncReadJsonSolutionsReader,
};
use crate::solution::QuerySolution;
use crate::xml::{XmlQueryResultsReader, XmlSolutionsReader};
use oxrdf::Variable;
use std::io::Read;
use std::sync::Arc;
#[cfg(feature = "async-tokio")]
use tokio::io::AsyncRead;
/// Parsers for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats.
///
@ -45,24 +51,24 @@ impl QueryResultsParser {
Self { format }
}
/// Reads a result file.
/// Reads a result file from a [`Read`] implementation.
///
/// Reads are buffered.
/// Reads are automatically buffered.
///
/// Example in XML (the API is the same for JSON and TSV):
/// ```
/// use sparesults::{QueryResultsFormat, QueryResultsParser, FromReadQueryResultsReader};
/// use oxrdf::{Literal, Variable};
///
/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml);
/// let xml_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml);
///
/// // boolean
/// if let FromReadQueryResultsReader::Boolean(v) = json_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head/><boolean>true</boolean></sparql>"#.as_slice())? {
/// if let FromReadQueryResultsReader::Boolean(v) = xml_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head/><boolean>true</boolean></sparql>"#.as_slice())? {
/// assert_eq!(v, true);
/// }
///
/// // solutions
/// if let FromReadQueryResultsReader::Solutions(solutions) = json_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head><variable name="foo"/><variable name="bar"/></head><results><result><binding name="foo"><literal>test</literal></binding></result></results></sparql>"#.as_slice())? {
/// if let FromReadQueryResultsReader::Solutions(solutions) = xml_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head><variable name="foo"/><variable name="bar"/></head><results><result><binding name="foo"><literal>test</literal></binding></result></results></sparql>"#.as_slice())? {
/// assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]);
/// for solution in solutions {
/// assert_eq!(solution?.iter().collect::<Vec<_>>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]);
@ -82,17 +88,17 @@ impl QueryResultsParser {
variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
variables: variables.into(),
solutions: SolutionsReaderKind::Xml(solutions),
solutions: FromReadSolutionsReaderKind::Xml(solutions),
}),
},
QueryResultsFormat::Json => match JsonQueryResultsReader::read(reader)? {
JsonQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r),
JsonQueryResultsReader::Solutions {
QueryResultsFormat::Json => match FromReadJsonQueryResultsReader::read(reader)? {
FromReadJsonQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r),
FromReadJsonQueryResultsReader::Solutions {
solutions,
variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
variables: variables.into(),
solutions: SolutionsReaderKind::Json(solutions),
solutions: FromReadSolutionsReaderKind::Json(solutions),
}),
},
QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()),
@ -103,7 +109,7 @@ impl QueryResultsParser {
variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
variables: variables.into(),
solutions: SolutionsReaderKind::Tsv(solutions),
solutions: FromReadSolutionsReaderKind::Tsv(solutions),
}),
},
})
@ -116,6 +122,56 @@ impl QueryResultsParser {
) -> Result<FromReadQueryResultsReader<R>, QueryResultsParseError> {
self.parse_read(reader)
}
/// Reads a result file from a Tokio [`AsyncRead`] implementation.
///
/// Reads are automatically buffered.
///
/// Example in XML (the API is the same for JSON and TSV):
/// ```no_run
/// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader};
/// use oxrdf::{Literal, Variable};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
/// let xml_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml);
///
/// // boolean
/// if let FromTokioAsyncReadQueryResultsReader::Boolean(v) = xml_parser.parse_tokio_async_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head/><boolean>true</boolean></sparql>"#.as_slice()).await? {
/// assert_eq!(v, true);
/// }
///
/// // solutions
/// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = xml_parser.parse_tokio_async_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head><variable name="foo"/><variable name="bar"/></head><results><result><binding name="foo"><literal>test</literal></binding></result></results></sparql>"#.as_slice()).await? {
/// assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]);
/// while let Some(solution) = solutions.next().await {
/// assert_eq!(solution?.iter().collect::<Vec<_>>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]);
/// }
/// }
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub async fn parse_tokio_async_read<R: AsyncRead + Unpin>(
&self,
reader: R,
) -> Result<FromTokioAsyncReadQueryResultsReader<R>, QueryResultsParseError> {
Ok(match self.format {
QueryResultsFormat::Xml => return Err(QueryResultsSyntaxError::msg("The XML query results parser does not support Tokio AsyncRead yet").into()),
QueryResultsFormat::Json => match FromTokioAsyncReadJsonQueryResultsReader::read(reader).await? {
FromTokioAsyncReadJsonQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r),
FromTokioAsyncReadJsonQueryResultsReader::Solutions {
solutions,
variables,
} => FromTokioAsyncReadQueryResultsReader::Solutions(FromTokioAsyncReadSolutionsReader {
variables: variables.into(),
solutions: FromTokioAsyncReadSolutionsReaderKind::Json(solutions),
}),
},
QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()),
QueryResultsFormat::Tsv => return Err(QueryResultsSyntaxError::msg("The TSV query results parser does not support Tokio AsyncRead yet").into()),
})
}
}
impl From<QueryResultsFormat> for QueryResultsParser {
@ -133,16 +189,16 @@ impl From<QueryResultsFormat> for QueryResultsParser {
/// use oxrdf::{Literal, Variable};
/// use sparesults::{FromReadQueryResultsReader, QueryResultsFormat, QueryResultsParser};
///
/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
///
/// // boolean
/// if let FromReadQueryResultsReader::Boolean(v) = json_parser.parse_read(b"true".as_slice())? {
/// if let FromReadQueryResultsReader::Boolean(v) = tsv_parser.parse_read(b"true".as_slice())? {
/// assert_eq!(v, true);
/// }
///
/// // solutions
/// if let FromReadQueryResultsReader::Solutions(solutions) =
/// json_parser.parse_read(b"?foo\t?bar\n\"test\"\t".as_slice())?
/// tsv_parser.parse_read(b"?foo\t?bar\n\"test\"\t".as_slice())?
/// {
/// assert_eq!(
/// solutions.variables(),
@ -188,12 +244,12 @@ pub enum FromReadQueryResultsReader<R: Read> {
/// ```
pub struct FromReadSolutionsReader<R: Read> {
variables: Arc<[Variable]>,
solutions: SolutionsReaderKind<R>,
solutions: FromReadSolutionsReaderKind<R>,
}
enum SolutionsReaderKind<R: Read> {
enum FromReadSolutionsReaderKind<R: Read> {
Xml(XmlSolutionsReader<R>),
Json(JsonSolutionsReader<R>),
Json(FromReadJsonSolutionsReader<R>),
Tsv(TsvSolutionsReader<R>),
}
@ -205,9 +261,9 @@ impl<R: Read> FromReadSolutionsReader<R> {
/// use oxrdf::Variable;
/// use sparesults::{FromReadQueryResultsReader, QueryResultsFormat, QueryResultsParser};
///
/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
/// if let FromReadQueryResultsReader::Solutions(solutions) =
/// json_parser.parse_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())?
/// tsv_parser.parse_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())?
/// {
/// assert_eq!(
/// solutions.variables(),
@ -231,9 +287,141 @@ impl<R: Read> Iterator for FromReadSolutionsReader<R> {
fn next(&mut self) -> Option<Self::Item> {
Some(
match &mut self.solutions {
SolutionsReaderKind::Xml(reader) => reader.read_next(),
SolutionsReaderKind::Json(reader) => reader.read_next(),
SolutionsReaderKind::Tsv(reader) => reader.read_next(),
FromReadSolutionsReaderKind::Xml(reader) => reader.read_next(),
FromReadSolutionsReaderKind::Json(reader) => reader.read_next(),
FromReadSolutionsReaderKind::Tsv(reader) => reader.read_next(),
}
.transpose()?
.map(|values| (Arc::clone(&self.variables), values).into()),
)
}
}
/// The reader for a given read of a results file.
///
/// It is either a read boolean ([`bool`]) or a streaming reader of a set of solutions ([`FromReadSolutionsReader`]).
///
/// Example in TSV (the API is the same for JSON and XML):
/// ```no_run
/// use oxrdf::{Literal, Variable};
/// use sparesults::{
/// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser,
/// };
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
///
/// // boolean
/// if let FromTokioAsyncReadQueryResultsReader::Boolean(v) = tsv_parser
/// .parse_tokio_async_read(b"true".as_slice())
/// .await?
/// {
/// assert_eq!(v, true);
/// }
///
/// // solutions
/// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = tsv_parser
/// .parse_tokio_async_read(b"?foo\t?bar\n\"test\"\t".as_slice())
/// .await?
/// {
/// assert_eq!(
/// solutions.variables(),
/// &[
/// Variable::new_unchecked("foo"),
/// Variable::new_unchecked("bar")
/// ]
/// );
/// while let Some(solution) = solutions.next().await {
/// assert_eq!(
/// solution?.iter().collect::<Vec<_>>(),
/// vec![(
/// &Variable::new_unchecked("foo"),
/// &Literal::from("test").into()
/// )]
/// );
/// }
/// }
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub enum FromTokioAsyncReadQueryResultsReader<R: AsyncRead + Unpin> {
Solutions(FromTokioAsyncReadSolutionsReader<R>),
Boolean(bool),
}
/// A streaming reader of a set of [`QuerySolution`] solutions.
///
/// It implements the [`Iterator`] API to iterate over the solutions.
///
/// Example in JSON (the API is the same for XML and TSV):
/// ```
/// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader};
/// use oxrdf::{Literal, Variable};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Json);
/// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = json_parser.parse_tokio_async_read(br#"{"head":{"vars":["foo","bar"]},"results":{"bindings":[{"foo":{"type":"literal","value":"test"}}]}}"#.as_slice()).await? {
/// assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]);
/// while let Some(solution) = solutions.next().await {
/// assert_eq!(solution?.iter().collect::<Vec<_>>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]);
/// }
/// }
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadSolutionsReader<R: AsyncRead + Unpin> {
variables: Arc<[Variable]>,
solutions: FromTokioAsyncReadSolutionsReaderKind<R>,
}
#[cfg(feature = "async-tokio")]
enum FromTokioAsyncReadSolutionsReaderKind<R: AsyncRead + Unpin> {
Json(FromTokioAsyncReadJsonSolutionsReader<R>),
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadSolutionsReader<R> {
/// Ordered list of the declared variables at the beginning of the results.
///
/// Example in TSV (the API is the same for JSON and XML):
/// ```no_run
/// use oxrdf::Variable;
/// use sparesults::{
/// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser,
/// };
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
/// if let FromTokioAsyncReadQueryResultsReader::Solutions(solutions) = tsv_parser
/// .parse_tokio_async_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())
/// .await?
/// {
/// assert_eq!(
/// solutions.variables(),
/// &[
/// Variable::new_unchecked("foo"),
/// Variable::new_unchecked("bar")
/// ]
/// );
/// }
/// # Ok(())
/// # }
/// ```
#[inline]
pub fn variables(&self) -> &[Variable] {
&self.variables
}
/// Reads the next solution or returns `None` if the file is finished.
pub async fn next(&mut self) -> Option<Result<QuerySolution, QueryResultsParseError>> {
Some(
match &mut self.solutions {
FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await,
}
.transpose()?
.map(|values| (Arc::clone(&self.variables), values).into()),

Loading…
Cancel
Save