Adds Tokio async to SPARQL XML results parser

pull/776/merge
Tpt 9 months ago committed by Thomas Tanon
parent c13cb8db7c
commit 7d45ea43f5
  1. 6
      lib/sparesults/src/error.rs
  2. 27
      lib/sparesults/src/parser.rs
  3. 862
      lib/sparesults/src/xml.rs

@ -47,6 +47,12 @@ impl From<quick_xml::Error> for QueryResultsParseError {
} }
} }
impl From<quick_xml::escape::EscapeError> for QueryResultsParseError {
#[inline]
fn from(error: quick_xml::escape::EscapeError) -> Self {
quick_xml::Error::from(error).into()
}
}
/// An error in the syntax of the parsed file. /// An error in the syntax of the parsed file.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[error(transparent)] #[error(transparent)]

@ -7,7 +7,9 @@ use crate::json::{
FromTokioAsyncReadJsonQueryResultsReader, FromTokioAsyncReadJsonSolutionsReader, FromTokioAsyncReadJsonQueryResultsReader, FromTokioAsyncReadJsonSolutionsReader,
}; };
use crate::solution::QuerySolution; use crate::solution::QuerySolution;
use crate::xml::{XmlQueryResultsReader, XmlSolutionsReader}; use crate::xml::{FromReadXmlQueryResultsReader, FromReadXmlSolutionsReader};
#[cfg(feature = "async-tokio")]
use crate::xml::{FromTokioAsyncReadXmlQueryResultsReader, FromTokioAsyncReadXmlSolutionsReader};
use oxrdf::Variable; use oxrdf::Variable;
use std::io::Read; use std::io::Read;
use std::sync::Arc; use std::sync::Arc;
@ -81,9 +83,9 @@ impl QueryResultsParser {
reader: R, reader: R,
) -> Result<FromReadQueryResultsReader<R>, QueryResultsParseError> { ) -> Result<FromReadQueryResultsReader<R>, QueryResultsParseError> {
Ok(match self.format { Ok(match self.format {
QueryResultsFormat::Xml => match XmlQueryResultsReader::read(reader)? { QueryResultsFormat::Xml => match FromReadXmlQueryResultsReader::read(reader)? {
XmlQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r), FromReadXmlQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r),
XmlQueryResultsReader::Solutions { FromReadXmlQueryResultsReader::Solutions {
solutions, solutions,
variables, variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
@ -128,7 +130,7 @@ impl QueryResultsParser {
/// Reads are automatically buffered. /// Reads are automatically buffered.
/// ///
/// Example in XML (the API is the same for JSON and TSV): /// Example in XML (the API is the same for JSON and TSV):
/// ```no_run /// ```
/// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader}; /// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader};
/// use oxrdf::{Literal, Variable}; /// use oxrdf::{Literal, Variable};
/// ///
@ -157,7 +159,16 @@ impl QueryResultsParser {
reader: R, reader: R,
) -> Result<FromTokioAsyncReadQueryResultsReader<R>, QueryResultsParseError> { ) -> Result<FromTokioAsyncReadQueryResultsReader<R>, QueryResultsParseError> {
Ok(match self.format { Ok(match self.format {
QueryResultsFormat::Xml => return Err(QueryResultsSyntaxError::msg("The XML query results parser does not support Tokio AsyncRead yet").into()), QueryResultsFormat::Xml => match FromTokioAsyncReadXmlQueryResultsReader::read(reader).await? {
FromTokioAsyncReadXmlQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r),
FromTokioAsyncReadXmlQueryResultsReader::Solutions {
solutions,
variables,
} => FromTokioAsyncReadQueryResultsReader::Solutions(FromTokioAsyncReadSolutionsReader {
variables: variables.into(),
solutions: FromTokioAsyncReadSolutionsReaderKind::Xml(solutions),
}),
},
QueryResultsFormat::Json => match FromTokioAsyncReadJsonQueryResultsReader::read(reader).await? { QueryResultsFormat::Json => match FromTokioAsyncReadJsonQueryResultsReader::read(reader).await? {
FromTokioAsyncReadJsonQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r), FromTokioAsyncReadJsonQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r),
FromTokioAsyncReadJsonQueryResultsReader::Solutions { FromTokioAsyncReadJsonQueryResultsReader::Solutions {
@ -248,7 +259,7 @@ pub struct FromReadSolutionsReader<R: Read> {
} }
enum FromReadSolutionsReaderKind<R: Read> { enum FromReadSolutionsReaderKind<R: Read> {
Xml(XmlSolutionsReader<R>), Xml(FromReadXmlSolutionsReader<R>),
Json(FromReadJsonSolutionsReader<R>), Json(FromReadJsonSolutionsReader<R>),
Tsv(TsvSolutionsReader<R>), Tsv(TsvSolutionsReader<R>),
} }
@ -381,6 +392,7 @@ pub struct FromTokioAsyncReadSolutionsReader<R: AsyncRead + Unpin> {
#[cfg(feature = "async-tokio")] #[cfg(feature = "async-tokio")]
enum FromTokioAsyncReadSolutionsReaderKind<R: AsyncRead + Unpin> { enum FromTokioAsyncReadSolutionsReaderKind<R: AsyncRead + Unpin> {
Json(FromTokioAsyncReadJsonSolutionsReader<R>), Json(FromTokioAsyncReadJsonSolutionsReader<R>),
Xml(FromTokioAsyncReadXmlSolutionsReader<R>),
} }
#[cfg(feature = "async-tokio")] #[cfg(feature = "async-tokio")]
@ -422,6 +434,7 @@ impl<R: AsyncRead + Unpin> FromTokioAsyncReadSolutionsReader<R> {
Some( Some(
match &mut self.solutions { match &mut self.solutions {
FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await, FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await,
FromTokioAsyncReadSolutionsReaderKind::Xml(reader) => reader.read_next().await,
} }
.transpose()? .transpose()?
.map(|values| (Arc::clone(&self.variables), values).into()), .map(|values| (Arc::clone(&self.variables), values).into()),

@ -3,15 +3,15 @@
use crate::error::{QueryResultsParseError, QueryResultsSyntaxError}; use crate::error::{QueryResultsParseError, QueryResultsSyntaxError};
use oxrdf::vocab::rdf; use oxrdf::vocab::rdf;
use oxrdf::*; use oxrdf::*;
use quick_xml::escape::unescape;
use quick_xml::events::{BytesDecl, BytesEnd, BytesStart, BytesText, Event}; use quick_xml::events::{BytesDecl, BytesEnd, BytesStart, BytesText, Event};
use quick_xml::{Reader, Writer}; use quick_xml::{Decoder, Reader, Writer};
use std::borrow::Cow;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::io::{self, BufReader, Read, Write}; use std::io::{self, BufReader, Read, Write};
use std::str; use std::mem::take;
use std::sync::Arc; use std::sync::Arc;
#[cfg(feature = "async-tokio")] #[cfg(feature = "async-tokio")]
use tokio::io::AsyncWrite; use tokio::io::{AsyncRead, AsyncWrite, BufReader as AsyncBufReader};
pub fn write_boolean_xml_result<W: Write>(write: W, value: bool) -> io::Result<W> { pub fn write_boolean_xml_result<W: Write>(write: W, value: bool) -> io::Result<W> {
let mut writer = Writer::new(write); let mut writer = Writer::new(write);
@ -154,7 +154,6 @@ impl InnerXmlSolutionsWriter {
} }
#[allow(clippy::unused_self)] #[allow(clippy::unused_self)]
fn write<'a>( fn write<'a>(
&self, &self,
output: &mut Vec<Event<'a>>, output: &mut Vec<Event<'a>>,
@ -218,123 +217,276 @@ fn write_xml_term<'a>(output: &mut Vec<Event<'a>>, term: TermRef<'a>) {
} }
} }
pub enum XmlQueryResultsReader<R: Read> { pub enum FromReadXmlQueryResultsReader<R: Read> {
Solutions { Solutions {
variables: Vec<Variable>, variables: Vec<Variable>,
solutions: XmlSolutionsReader<R>, solutions: FromReadXmlSolutionsReader<R>,
}, },
Boolean(bool), Boolean(bool),
} }
impl<R: Read> XmlQueryResultsReader<R> { impl<R: Read> FromReadXmlQueryResultsReader<R> {
pub fn read(source: R) -> Result<Self, QueryResultsParseError> { pub fn read(read: R) -> Result<Self, QueryResultsParseError> {
enum State { let mut reader = Reader::from_reader(BufReader::new(read));
Start, reader.trim_text(true);
Sparql, reader.expand_empty_elements(true);
Head, let mut reader_buffer = Vec::new();
AfterHead, let mut inner = XmlInnerQueryResultsReader {
Boolean, state: ResultsState::Start,
variables: Vec::new(),
decoder: reader.decoder(),
};
loop {
reader_buffer.clear();
let event = reader.read_event_into(&mut reader_buffer)?;
if let Some(result) = inner.read_event(event)? {
return Ok(match result {
XmlInnerQueryResults::Solutions {
variables,
solutions,
} => Self::Solutions {
variables,
solutions: FromReadXmlSolutionsReader {
reader,
inner: solutions,
reader_buffer,
},
},
XmlInnerQueryResults::Boolean(value) => Self::Boolean(value),
});
}
}
}
}
pub struct FromReadXmlSolutionsReader<R: Read> {
reader: Reader<BufReader<R>>,
inner: XmlInnerSolutionsReader,
reader_buffer: Vec<u8>,
}
impl<R: Read> FromReadXmlSolutionsReader<R> {
pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
loop {
self.reader_buffer.clear();
let event = self.reader.read_event_into(&mut self.reader_buffer)?;
if event == Event::Eof {
return Ok(None);
}
if let Some(solution) = self.inner.read_event(event)? {
return Ok(Some(solution));
}
} }
}
}
let mut reader = Reader::from_reader(BufReader::new(source)); #[cfg(feature = "async-tokio")]
pub enum FromTokioAsyncReadXmlQueryResultsReader<R: AsyncRead + Unpin> {
Solutions {
variables: Vec<Variable>,
solutions: FromTokioAsyncReadXmlSolutionsReader<R>,
},
Boolean(bool),
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadXmlQueryResultsReader<R> {
pub async fn read(read: R) -> Result<Self, QueryResultsParseError> {
let mut reader = Reader::from_reader(AsyncBufReader::new(read));
reader.trim_text(true); reader.trim_text(true);
reader.expand_empty_elements(true); reader.expand_empty_elements(true);
let mut reader_buffer = Vec::new();
let mut inner = XmlInnerQueryResultsReader {
state: ResultsState::Start,
variables: Vec::new(),
decoder: reader.decoder(),
};
loop {
reader_buffer.clear();
let event = reader.read_event_into_async(&mut reader_buffer).await?;
if let Some(result) = inner.read_event(event)? {
return Ok(match result {
XmlInnerQueryResults::Solutions {
variables,
solutions,
} => Self::Solutions {
variables,
solutions: FromTokioAsyncReadXmlSolutionsReader {
reader,
inner: solutions,
reader_buffer,
},
},
XmlInnerQueryResults::Boolean(value) => Self::Boolean(value),
});
}
}
}
}
let mut buffer = Vec::default(); #[cfg(feature = "async-tokio")]
let mut variables = Vec::default(); pub struct FromTokioAsyncReadXmlSolutionsReader<R: AsyncRead + Unpin> {
let mut state = State::Start; reader: Reader<AsyncBufReader<R>>,
inner: XmlInnerSolutionsReader,
reader_buffer: Vec<u8>,
}
// Read header #[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadXmlSolutionsReader<R> {
pub async fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
loop { loop {
buffer.clear(); self.reader_buffer.clear();
let event = reader.read_event_into(&mut buffer)?; let event = self
match event { .reader
Event::Start(event) => match state { .read_event_into_async(&mut self.reader_buffer)
State::Start => { .await?;
if event.local_name().as_ref() == b"sparql" { if event == Event::Eof {
state = State::Sparql; return Ok(None);
} else { }
return Err(QueryResultsSyntaxError::msg(format!("Expecting <sparql> tag, found <{}>", decode(&reader, &event.name())?)).into()); if let Some(solution) = self.inner.read_event(event)? {
} return Ok(Some(solution));
}
}
}
}
enum XmlInnerQueryResults {
Solutions {
variables: Vec<Variable>,
solutions: XmlInnerSolutionsReader,
},
Boolean(bool),
}
#[derive(Clone, Copy)]
enum ResultsState {
Start,
Sparql,
Head,
AfterHead,
Boolean,
}
struct XmlInnerQueryResultsReader {
state: ResultsState,
variables: Vec<Variable>,
decoder: Decoder,
}
impl XmlInnerQueryResultsReader {
pub fn read_event(
&mut self,
event: Event<'_>,
) -> Result<Option<XmlInnerQueryResults>, QueryResultsParseError> {
match event {
Event::Start(event) => match self.state {
ResultsState::Start => {
if event.local_name().as_ref() == b"sparql" {
self.state = ResultsState::Sparql;
Ok(None)
} else {
Err(QueryResultsSyntaxError::msg(format!("Expecting <sparql> tag, found <{}>", self.decoder.decode(event.name().as_ref())?)).into())
} }
State::Sparql => { }
if event.local_name().as_ref() == b"head" { ResultsState::Sparql => {
state = State::Head; if event.local_name().as_ref() == b"head" {
} else { self.state = ResultsState::Head;
return Err(QueryResultsSyntaxError::msg(format!("Expecting <head> tag, found <{}>", decode(&reader, &event.name())?)).into()); Ok(None)
} } else {
Err(QueryResultsSyntaxError::msg(format!("Expecting <head> tag, found <{}>", self.decoder.decode(event.name().as_ref())?)).into())
} }
State::Head => { }
if event.local_name().as_ref() == b"variable" { ResultsState::Head => {
let name = event.attributes() if event.local_name().as_ref() == b"variable" {
.filter_map(Result::ok) let name = event.attributes()
.find(|attr| attr.key.local_name().as_ref() == b"name") .filter_map(Result::ok)
.ok_or_else(|| QueryResultsSyntaxError::msg("No name attribute found for the <variable> tag"))? .find(|attr| attr.key.local_name().as_ref() == b"name")
.decode_and_unescape_value(&reader)?; .ok_or_else(|| QueryResultsSyntaxError::msg("No name attribute found for the <variable> tag"))?;
let variable = Variable::new(name).map_err(|e| QueryResultsSyntaxError::msg(format!("Invalid variable name: {e}")))?; let name = unescape(&self.decoder.decode(&name.value)?)?.into_owned();
if variables.contains(&variable) { let variable = Variable::new(name).map_err(|e| QueryResultsSyntaxError::msg(format!("Invalid variable name: {e}")))?;
return Err(QueryResultsSyntaxError::msg(format!( if self.variables.contains(&variable) {
"The variable {variable} is declared twice" return Err(QueryResultsSyntaxError::msg(format!(
)) "The variable {variable} is declared twice"
.into()); ))
} .into());
variables.push(variable);
} else if event.local_name().as_ref() == b"link" {
// no op
} else {
return Err(QueryResultsSyntaxError::msg(format!("Expecting <variable> or <link> tag, found <{}>", decode(&reader, &event.name())?)).into());
} }
self.variables.push(variable);
Ok(None)
} else if event.local_name().as_ref() == b"link" {
// no op
Ok(None)
} else {
Err(QueryResultsSyntaxError::msg(format!("Expecting <variable> or <link> tag, found <{}>", self.decoder.decode(event.name().as_ref())?)).into())
} }
State::AfterHead => { }
if event.local_name().as_ref() == b"boolean" { ResultsState::AfterHead => {
state = State::Boolean if event.local_name().as_ref() == b"boolean" {
} else if event.local_name().as_ref() == b"results" { self.state = ResultsState::Boolean;
let mut mapping = BTreeMap::default(); Ok(None)
for (i, var) in variables.iter().enumerate() { } else if event.local_name().as_ref() == b"results" {
mapping.insert(var.clone().into_string(), i); let mut mapping = BTreeMap::default();
} for (i, var) in self.variables.iter().enumerate() {
return Ok(Self::Solutions { variables, mapping.insert(var.clone().into_string(), i);
solutions: XmlSolutionsReader {
reader,
buffer,
mapping,
stack: Vec::new(),
subject_stack: Vec::new(),
predicate_stack: Vec::new(),
object_stack: Vec::new(),
}});
} else if event.local_name().as_ref() != b"link" && event.local_name().as_ref() != b"results" && event.local_name().as_ref() != b"boolean" {
return Err(QueryResultsSyntaxError::msg(format!("Expecting sparql tag, found <{}>", decode(&reader, &event.name())?)).into());
} }
Ok(Some(XmlInnerQueryResults::Solutions {
variables: take(&mut self.variables),
solutions: XmlInnerSolutionsReader {
decoder: self.decoder,
mapping,
state_stack: vec![State::Start, State::Start],
new_bindings: Vec::new(),
current_var: None,
term: None,
lang: None,
datatype: None,
subject_stack: Vec::new(),
predicate_stack: Vec::new(),
object_stack: Vec::new(),
},
}))
} else if event.local_name().as_ref() != b"link" && event.local_name().as_ref() != b"results" && event.local_name().as_ref() != b"boolean" {
Err(QueryResultsSyntaxError::msg(format!("Expecting sparql tag, found <{}>", self.decoder.decode(event.name().as_ref())?)).into())
} else {
Ok(None)
} }
State::Boolean => return Err(QueryResultsSyntaxError::msg(format!("Unexpected tag inside of <boolean> tag: <{}>", decode(&reader, &event.name())?)).into()) }
}, ResultsState::Boolean => Err(QueryResultsSyntaxError::msg(format!("Unexpected tag inside of <boolean> tag: <{}>", self.decoder.decode(event.name().as_ref())?)).into())
Event::Text(event) => { },
let value = event.unescape()?; Event::Text(event) => {
return match state { let value = event.unescape()?;
State::Boolean => { match self.state {
return if value == "true" { ResultsState::Boolean => {
Ok(Self::Boolean(true)) if value == "true" {
} else if value == "false" { Ok(Some(XmlInnerQueryResults::Boolean(true)))
Ok(Self::Boolean(false)) } else if value == "false" {
} else { Ok(Some(XmlInnerQueryResults::Boolean(false)))
Err(QueryResultsSyntaxError::msg(format!("Unexpected boolean value. Found '{value}'")).into()) } else {
}; Err(QueryResultsSyntaxError::msg(format!("Unexpected boolean value. Found '{value}'")).into())
}
_ => Err(QueryResultsSyntaxError::msg(format!("Unexpected textual value found: '{value}'")).into())
};
},
Event::End(event) => {
if let State::Head = state {
if event.local_name().as_ref() == b"head" {
state = State::AfterHead
} }
} else {
return Err(QueryResultsSyntaxError::msg("Unexpected early file end. All results file should have a <head> and a <result> or <boolean> tag").into());
} }
}, _ => Err(QueryResultsSyntaxError::msg(format!("Unexpected textual value found: '{value}'")).into())
Event::Eof => return Err(QueryResultsSyntaxError::msg("Unexpected early file end. All results file should have a <head> and a <result> or <boolean> tag").into()), }
_ => (), }
Event::End(event) => {
if let ResultsState::Head = self.state {
if event.local_name().as_ref() == b"head" {
self.state = ResultsState::AfterHead
}
Ok(None)
} else {
Err(QueryResultsSyntaxError::msg("Unexpected early file end. All results file should have a <head> and a <result> or <boolean> tag").into())
}
}
Event::Eof => Err(QueryResultsSyntaxError::msg("Unexpected early file end. All results file should have a <head> and a <result> or <boolean> tag").into()),
Event::Comment(_) | Event::Decl(_) | Event::PI(_) | Event::DocType(_) => {
Ok(None)
}
Event::Empty(_) => unreachable!("Empty events are expended"),
Event::CData(_) => {
Err(QueryResultsSyntaxError::msg(
"<![CDATA[...]]> are not supported in SPARQL XML results",
)
.into())
} }
} }
} }
@ -351,285 +503,294 @@ enum State {
Subject, Subject,
Predicate, Predicate,
Object, Object,
End,
} }
pub struct XmlSolutionsReader<R: Read> { struct XmlInnerSolutionsReader {
reader: Reader<BufReader<R>>, decoder: Decoder,
buffer: Vec<u8>,
mapping: BTreeMap<String, usize>, mapping: BTreeMap<String, usize>,
stack: Vec<State>, state_stack: Vec<State>,
new_bindings: Vec<Option<Term>>,
current_var: Option<String>,
term: Option<Term>,
lang: Option<String>,
datatype: Option<NamedNode>,
subject_stack: Vec<Term>, subject_stack: Vec<Term>,
predicate_stack: Vec<Term>, predicate_stack: Vec<Term>,
object_stack: Vec<Term>, object_stack: Vec<Term>,
} }
impl<R: Read> XmlSolutionsReader<R> { impl XmlInnerSolutionsReader {
pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> { #[allow(clippy::unwrap_in_result)]
let mut state = State::Start; pub fn read_event(
&mut self,
let mut new_bindings = vec![None; self.mapping.len()]; event: Event<'_>,
) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
let mut current_var = None; match event {
let mut term: Option<Term> = None; Event::Start(event) => match self.state_stack.last().unwrap() {
let mut lang = None; State::Start => {
let mut datatype = None; if event.local_name().as_ref() == b"result" {
loop { self.new_bindings = vec![None; self.mapping.len()];
self.buffer.clear(); self.state_stack.push(State::Result);
let event = self.reader.read_event_into(&mut self.buffer)?; Ok(None)
match event { } else {
Event::Start(event) => match state { Err(QueryResultsSyntaxError::msg(format!(
State::Start => { "Expecting <result>, found <{}>",
if event.local_name().as_ref() == b"result" { self.decoder.decode(event.name().as_ref())?
state = State::Result; ))
} else { .into())
return Err(QueryResultsSyntaxError::msg(format!(
"Expecting <result>, found <{}>",
decode(&self.reader, &event.name())?
))
.into());
}
}
State::Result => {
if event.local_name().as_ref() == b"binding" {
match event
.attributes()
.filter_map(Result::ok)
.find(|attr| attr.key.local_name().as_ref() == b"name")
{
Some(attr) => {
current_var = Some(
attr.decode_and_unescape_value(&self.reader)?.to_string(),
)
}
None => {
return Err(QueryResultsSyntaxError::msg(
"No name attribute found for the <binding> tag",
)
.into());
}
}
state = State::Binding;
} else {
return Err(QueryResultsSyntaxError::msg(format!(
"Expecting <binding>, found <{}>",
decode(&self.reader, &event.name())?
))
.into());
}
} }
State::Binding | State::Subject | State::Predicate | State::Object => { }
if term.is_some() { State::Result => {
if event.local_name().as_ref() == b"binding" {
let Some(attr) = event
.attributes()
.filter_map(Result::ok)
.find(|attr| attr.key.local_name().as_ref() == b"name")
else {
return Err(QueryResultsSyntaxError::msg( return Err(QueryResultsSyntaxError::msg(
"There is already a value for the current binding", "No name attribute found for the <binding> tag",
) )
.into()); .into());
} };
self.stack.push(state); self.current_var =
if event.local_name().as_ref() == b"uri" { Some(unescape(&self.decoder.decode(&attr.value)?)?.into_owned());
state = State::Uri; self.state_stack.push(State::Binding);
} else if event.local_name().as_ref() == b"bnode" { Ok(None)
state = State::BNode; } else {
} else if event.local_name().as_ref() == b"literal" { Err(QueryResultsSyntaxError::msg(format!(
for attr in event.attributes() { "Expecting <binding>, found <{}>",
let attr = attr.map_err(quick_xml::Error::from)?; self.decoder.decode(event.name().as_ref())?
if attr.key.as_ref() == b"xml:lang" { ))
lang = Some( .into())
attr.decode_and_unescape_value(&self.reader)?.to_string(),
);
} else if attr.key.local_name().as_ref() == b"datatype" {
let iri = attr.decode_and_unescape_value(&self.reader)?;
datatype =
Some(NamedNode::new(iri.to_string()).map_err(|e| {
QueryResultsSyntaxError::msg(format!(
"Invalid datatype IRI '{iri}': {e}"
))
})?);
}
}
state = State::Literal;
} else if event.local_name().as_ref() == b"triple" {
state = State::Triple;
} else {
return Err(QueryResultsSyntaxError::msg(format!(
"Expecting <uri>, <bnode> or <literal> found <{}>",
decode(&self.reader, &event.name())?
))
.into());
}
} }
State::Triple => { }
if event.local_name().as_ref() == b"subject" { State::Binding | State::Subject | State::Predicate | State::Object => {
state = State::Subject if self.term.is_some() {
} else if event.local_name().as_ref() == b"predicate" { return Err(QueryResultsSyntaxError::msg(
state = State::Predicate "There is already a value for the current binding",
} else if event.local_name().as_ref() == b"object" { )
state = State::Object .into());
} else {
return Err(QueryResultsSyntaxError::msg(format!(
"Expecting <subject>, <predicate> or <object> found <{}>",
decode(&self.reader, &event.name())?
))
.into());
}
} }
_ => (), if event.local_name().as_ref() == b"uri" {
}, self.state_stack.push(State::Uri);
Event::Text(event) => { Ok(None)
let data = event.unescape()?; } else if event.local_name().as_ref() == b"bnode" {
match state { self.state_stack.push(State::BNode);
State::Uri => { Ok(None)
term = Some( } else if event.local_name().as_ref() == b"literal" {
NamedNode::new(data.to_string()) for attr in event.attributes() {
.map_err(|e| { let attr = attr.map_err(quick_xml::Error::from)?;
QueryResultsSyntaxError::msg(format!( if attr.key.as_ref() == b"xml:lang" {
"Invalid IRI value '{data}': {e}" self.lang = Some(
)) unescape(&self.decoder.decode(&attr.value)?)?.into_owned(),
})? );
.into(), } else if attr.key.local_name().as_ref() == b"datatype" {
) let iri = self.decoder.decode(&attr.value)?;
} let iri = unescape(&iri)?;
State::BNode => { self.datatype =
term = Some( Some(NamedNode::new(iri.as_ref()).map_err(|e| {
BlankNode::new(data.to_string())
.map_err(|e| {
QueryResultsSyntaxError::msg(format!( QueryResultsSyntaxError::msg(format!(
"Invalid blank node value '{data}': {e}" "Invalid datatype IRI '{iri}': {e}"
)) ))
})? })?);
.into(),
)
}
State::Literal => {
term = Some(build_literal(data, lang.take(), datatype.take())?.into());
}
_ => {
return Err(QueryResultsSyntaxError::msg(format!(
"Unexpected textual value found: {data}"
))
.into());
}
}
}
Event::End(_) => match state {
State::Start => state = State::End,
State::Result => return Ok(Some(new_bindings)),
State::Binding => {
if let Some(var) = &current_var {
if let Some(var) = self.mapping.get(var) {
new_bindings[*var] = term.take()
} else {
return Err(
QueryResultsSyntaxError::msg(format!("The variable '{var}' is used in a binding but not declared in the variables list")).into()
);
} }
} else {
return Err(QueryResultsSyntaxError::msg(
"No name found for <binding> tag",
)
.into());
}
state = State::Result;
}
State::Subject => {
if let Some(subject) = term.take() {
self.subject_stack.push(subject)
} }
state = State::Triple; self.state_stack.push(State::Literal);
} Ok(None)
State::Predicate => { } else if event.local_name().as_ref() == b"triple" {
if let Some(predicate) = term.take() { self.state_stack.push(State::Triple);
self.predicate_stack.push(predicate) Ok(None)
} } else {
state = State::Triple; Err(QueryResultsSyntaxError::msg(format!(
"Expecting <uri>, <bnode> or <literal> found <{}>",
self.decoder.decode(event.name().as_ref())?
))
.into())
} }
State::Object => { }
if let Some(object) = term.take() { State::Triple => {
self.object_stack.push(object) if event.local_name().as_ref() == b"subject" {
} self.state_stack.push(State::Subject);
state = State::Triple; Ok(None)
} else if event.local_name().as_ref() == b"predicate" {
self.state_stack.push(State::Predicate);
Ok(None)
} else if event.local_name().as_ref() == b"object" {
self.state_stack.push(State::Object);
Ok(None)
} else {
Err(QueryResultsSyntaxError::msg(format!(
"Expecting <subject>, <predicate> or <object> found <{}>",
self.decoder.decode(event.name().as_ref())?
))
.into())
} }
}
State::Uri => Err(QueryResultsSyntaxError::msg(format!(
"<uri> must only contain a string, found <{}>",
self.decoder.decode(event.name().as_ref())?
))
.into()),
State::BNode => Err(QueryResultsSyntaxError::msg(format!(
"<uri> must only contain a string, found <{}>",
self.decoder.decode(event.name().as_ref())?
))
.into()),
State::Literal => Err(QueryResultsSyntaxError::msg(format!(
"<uri> must only contain a string, found <{}>",
self.decoder.decode(event.name().as_ref())?
))
.into()),
},
Event::Text(event) => {
let data = event.unescape()?;
match self.state_stack.last().unwrap() {
State::Uri => { State::Uri => {
state = self self.term = Some(
.stack NamedNode::new(data.to_string())
.pop() .map_err(|e| {
.ok_or_else(|| QueryResultsSyntaxError::msg("Empty stack"))? QueryResultsSyntaxError::msg(format!(
"Invalid IRI value '{data}': {e}"
))
})?
.into(),
);
Ok(None)
} }
State::BNode => { State::BNode => {
if term.is_none() { self.term = Some(
// We default to a random bnode BlankNode::new(data.to_string())
term = Some(BlankNode::default().into()) .map_err(|e| {
} QueryResultsSyntaxError::msg(format!(
state = self "Invalid blank node value '{data}': {e}"
.stack ))
.pop() })?
.ok_or_else(|| QueryResultsSyntaxError::msg("Empty stack"))? .into(),
);
Ok(None)
} }
State::Literal => { State::Literal => {
if term.is_none() { self.term = Some(
// We default to the empty literal build_literal(data, self.lang.take(), self.datatype.take())?.into(),
term = Some(build_literal("", lang.take(), datatype.take())?.into()) );
} Ok(None)
state = self
.stack
.pop()
.ok_or_else(|| QueryResultsSyntaxError::msg("Empty stack"))?;
} }
State::Triple => { _ => Err(QueryResultsSyntaxError::msg(format!(
#[cfg(feature = "rdf-star")] "Unexpected textual value found: {data}"
if let (Some(subject), Some(predicate), Some(object)) = ( ))
self.subject_stack.pop(), .into()),
self.predicate_stack.pop(), }
self.object_stack.pop(), }
) { Event::End(_) => match self.state_stack.pop().unwrap() {
term = Some( State::Start | State::Uri => Ok(None),
Triple::new( State::Result => Ok(Some(take(&mut self.new_bindings))),
match subject { State::Binding => {
Term::NamedNode(subject) => subject.into(), if let Some(var) = &self.current_var {
Term::BlankNode(subject) => subject.into(), if let Some(var) = self.mapping.get(var) {
Term::Triple(subject) => Subject::Triple(subject), self.new_bindings[*var] = self.term.take()
Term::Literal(_) => {
return Err(QueryResultsSyntaxError::msg(
"The <subject> value should not be a <literal>",
)
.into())
}
},
match predicate {
Term::NamedNode(predicate) => predicate,
_ => {
return Err(QueryResultsSyntaxError::msg(
"The <predicate> value should be an <uri>",
)
.into())
}
},
object,
)
.into(),
);
state = self
.stack
.pop()
.ok_or_else(|| QueryResultsSyntaxError::msg("Empty stack"))?;
} else { } else {
return Err( return Err(
QueryResultsSyntaxError::msg("A <triple> should contain a <subject>, a <predicate> and an <object>").into() QueryResultsSyntaxError::msg(format!("The variable '{var}' is used in a binding but not declared in the variables list")).into()
); );
} }
#[cfg(not(feature = "rdf-star"))] } else {
{ return Err(QueryResultsSyntaxError::msg(
return Err(QueryResultsSyntaxError::msg( "No name found for <binding> tag",
"The <triple> tag is only supported with RDF-star", )
.into());
}
Ok(None)
}
State::Subject => {
if let Some(subject) = self.term.take() {
self.subject_stack.push(subject)
}
Ok(None)
}
State::Predicate => {
if let Some(predicate) = self.term.take() {
self.predicate_stack.push(predicate)
}
Ok(None)
}
State::Object => {
if let Some(object) = self.term.take() {
self.object_stack.push(object)
}
Ok(None)
}
State::BNode => {
if self.term.is_none() {
// We default to a random bnode
self.term = Some(BlankNode::default().into())
}
Ok(None)
}
State::Literal => {
if self.term.is_none() {
// We default to the empty literal
self.term =
Some(build_literal("", self.lang.take(), self.datatype.take())?.into())
}
Ok(None)
}
State::Triple => {
#[cfg(feature = "rdf-star")]
if let (Some(subject), Some(predicate), Some(object)) = (
self.subject_stack.pop(),
self.predicate_stack.pop(),
self.object_stack.pop(),
) {
self.term = Some(
Triple::new(
match subject {
Term::NamedNode(subject) => subject.into(),
Term::BlankNode(subject) => subject.into(),
Term::Triple(subject) => Subject::Triple(subject),
Term::Literal(_) => {
return Err(QueryResultsSyntaxError::msg(
"The <subject> value should not be a <literal>",
)
.into());
}
},
match predicate {
Term::NamedNode(predicate) => predicate,
_ => {
return Err(QueryResultsSyntaxError::msg(
"The <predicate> value should be an <uri>",
)
.into());
}
},
object,
) )
.into()); .into(),
} );
Ok(None)
} else {
Err(QueryResultsSyntaxError::msg(
"A <triple> should contain a <subject>, a <predicate> and an <object>",
)
.into())
}
#[cfg(not(feature = "rdf-star"))]
{
Err(QueryResultsSyntaxError::msg(
"The <triple> tag is only supported with RDF-star",
)
.into())
} }
State::End => (), }
}, },
Event::Eof => return Ok(None), Event::Eof | Event::Comment(_) | Event::Decl(_) | Event::PI(_) | Event::DocType(_) => {
_ => (), Ok(None)
} }
Event::Empty(_) => unreachable!("Empty events are expended"),
Event::CData(_) => Err(QueryResultsSyntaxError::msg(
"<![CDATA[...]]> are not supported in SPARQL XML results",
)
.into()),
} }
} }
} }
@ -661,13 +822,6 @@ fn build_literal(
} }
} }
fn decode<'a, T>(
reader: &Reader<T>,
data: &'a impl AsRef<[u8]>,
) -> Result<Cow<'a, str>, QueryResultsParseError> {
Ok(reader.decoder().decode(data.as_ref())?)
}
fn map_xml_error(error: quick_xml::Error) -> io::Error { fn map_xml_error(error: quick_xml::Error) -> io::Error {
match error { match error {
quick_xml::Error::Io(error) => { quick_xml::Error::Io(error) => {

Loading…
Cancel
Save