diff --git a/lib/sparesults/src/json.rs b/lib/sparesults/src/json.rs index 25da2d64..efed124a 100644 --- a/lib/sparesults/src/json.rs +++ b/lib/sparesults/src/json.rs @@ -1,20 +1,16 @@ //! Implementation of [SPARQL Query Results JSON Format](https://www.w3.org/TR/sparql11-results-json/) use crate::error::{QueryResultsParseError, QueryResultsSyntaxError}; -#[cfg(feature = "async-tokio")] -use json_event_parser::ToTokioAsyncWriteJsonWriter; use json_event_parser::{FromReadJsonReader, JsonEvent, ToWriteJsonWriter}; +#[cfg(feature = "async-tokio")] +use json_event_parser::{FromTokioAsyncReadJsonReader, ToTokioAsyncWriteJsonWriter}; use oxrdf::vocab::rdf; use oxrdf::*; use std::collections::BTreeMap; use std::io::{self, Read, Write}; use std::mem::take; #[cfg(feature = "async-tokio")] -use tokio::io::AsyncWrite; - -/// This limit is set in order to avoid stack overflow error when parsing nested triples due to too many recursive calls. -/// The actual limit value is a wet finger compromise between not failing to parse valid files and avoiding to trigger stack overflow errors. -const MAX_NUMBER_OF_NESTED_TRIPLES: usize = 128; +use tokio::io::{AsyncRead, AsyncWrite}; pub fn write_boolean_json_result(write: W, value: bool) -> io::Result { let mut writer = ToWriteJsonWriter::new(write); @@ -224,520 +220,882 @@ fn write_json_term<'a>(output: &mut Vec>, term: TermRef<'a>) { } } -pub enum JsonQueryResultsReader { +pub enum FromReadJsonQueryResultsReader { Solutions { variables: Vec, - solutions: JsonSolutionsReader, + solutions: FromReadJsonSolutionsReader, }, Boolean(bool), } -impl JsonQueryResultsReader { +impl FromReadJsonQueryResultsReader { pub fn read(read: R) -> Result { let mut reader = FromReadJsonReader::new(read); - let mut variables = None; - let mut buffered_bindings: Option> = None; - let mut output_iter = None; - - if reader.read_next_event()? != JsonEvent::StartObject { - return Err( - QueryResultsSyntaxError::msg("SPARQL JSON results should be an object").into(), - ); + let mut inner = JsonInnerReader::new(); + loop { + if let Some(result) = inner.read_event(reader.read_next_event()?)? { + return match result { + JsonInnerQueryResults::Solutions { + variables, + solutions, + } => Ok(Self::Solutions { + variables, + solutions: FromReadJsonSolutionsReader { + inner: solutions, + reader, + }, + }), + JsonInnerQueryResults::Boolean(value) => Ok(Self::Boolean(value)), + }; + } } + } +} + +pub struct FromReadJsonSolutionsReader { + inner: JsonInnerSolutions, + reader: FromReadJsonReader, +} + +impl FromReadJsonSolutionsReader { + pub fn read_next(&mut self) -> Result>>, QueryResultsParseError> { + match &mut self.inner { + JsonInnerSolutions::Reader(reader) => loop { + let event = self.reader.read_next_event()?; + if event == JsonEvent::Eof { + return Ok(None); + } + if let Some(result) = reader.read_event(event)? { + return Ok(Some(result)); + } + }, + JsonInnerSolutions::Iterator(iter) => iter.next(), + } + } +} +#[cfg(feature = "async-tokio")] +pub enum FromTokioAsyncReadJsonQueryResultsReader { + Solutions { + variables: Vec, + solutions: FromTokioAsyncReadJsonSolutionsReader, + }, + Boolean(bool), +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadJsonQueryResultsReader { + pub async fn read(read: R) -> Result { + let mut reader = FromTokioAsyncReadJsonReader::new(read); + let mut inner = JsonInnerReader::new(); loop { - let event = reader.read_next_event()?; - match event { + if let Some(result) = inner.read_event(reader.read_next_event().await?)? { + return match result { + JsonInnerQueryResults::Solutions { + variables, + solutions, + } => Ok(Self::Solutions { + variables, + solutions: FromTokioAsyncReadJsonSolutionsReader { + inner: solutions, + reader, + }, + }), + JsonInnerQueryResults::Boolean(value) => Ok(Self::Boolean(value)), + }; + } + } + } +} + +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadJsonSolutionsReader { + inner: JsonInnerSolutions, + reader: FromTokioAsyncReadJsonReader, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadJsonSolutionsReader { + pub async fn read_next(&mut self) -> Result>>, QueryResultsParseError> { + match &mut self.inner { + JsonInnerSolutions::Reader(reader) => loop { + let event = self.reader.read_next_event().await?; + if event == JsonEvent::Eof { + return Ok(None); + } + if let Some(result) = reader.read_event(event)? { + return Ok(Some(result)); + } + }, + JsonInnerSolutions::Iterator(iter) => iter.next(), + } + } +} + +enum JsonInnerQueryResults { + Solutions { + variables: Vec, + solutions: JsonInnerSolutions, + }, + Boolean(bool), +} + +enum JsonInnerSolutions { + Reader(JsonInnerSolutionsReader), + Iterator(JsonBufferedSolutionsIterator), +} + +struct JsonInnerReader { + state: JsonInnerReaderState, + variables: Vec, + current_solution_variables: Vec, + current_solution_values: Vec, + solutions: Vec<(Vec, Vec)>, + vars_read: bool, + solutions_read: bool, +} + +enum JsonInnerReaderState { + Start, + InRootObject, + BeforeHead, + InHead, + BeforeVars, + InVars, + BeforeLinks, + InLinks, + BeforeResults, + InResults, + BeforeBindings, + BeforeSolution, + BetweenSolutionTerms, + Term { + reader: JsonInnerTermReader, + variable: String, + }, + AfterBindings, + BeforeBoolean, + Ignore { + level: usize, + after: JsonInnerReaderStateAfterIgnore, + }, +} + +#[allow(clippy::enum_variant_names)] +#[derive(Clone, Copy)] +enum JsonInnerReaderStateAfterIgnore { + InRootObject, + InHead, + InResults, + AfterBindings, +} + +impl JsonInnerReader { + fn new() -> Self { + Self { + state: JsonInnerReaderState::Start, + variables: Vec::new(), + current_solution_variables: Vec::new(), + current_solution_values: Vec::new(), + solutions: Vec::new(), + vars_read: false, + solutions_read: false, + } + } + + fn read_event( + &mut self, + event: JsonEvent<'_>, + ) -> Result, QueryResultsSyntaxError> { + match &mut self.state { + JsonInnerReaderState::Start => { + if event == JsonEvent::StartObject { + self.state = JsonInnerReaderState::InRootObject; + Ok(None) + } else { + Err(QueryResultsSyntaxError::msg( + "SPARQL JSON results must be an object", + )) + } + } + JsonInnerReaderState::InRootObject => match event { JsonEvent::ObjectKey(key) => match key.as_ref() { "head" => { - let extracted_variables = read_head(&mut reader)?; - if let Some(buffered_bindings) = buffered_bindings.take() { - let mut mapping = BTreeMap::default(); - for (i, var) in extracted_variables.iter().enumerate() { - mapping.insert(var.as_str().to_owned(), i); - } - output_iter = Some(Self::Solutions { - variables: extracted_variables, - solutions: JsonSolutionsReader { - kind: JsonSolutionsReaderKind::Buffered { - bindings: buffered_bindings.into_iter(), - }, - mapping, - }, - }); - } else { - variables = Some(extracted_variables); - } + self.state = JsonInnerReaderState::BeforeHead; + Ok(None) } "results" => { - if reader.read_next_event()? != JsonEvent::StartObject { - return Err(QueryResultsSyntaxError::msg( - "'results' should be an object", - ) - .into()); - } - loop { - match reader.read_next_event()? { - JsonEvent::ObjectKey(k) if k == "bindings" => break, // Found - JsonEvent::ObjectKey(_) => ignore_value(&mut reader)?, - _ => { - return Err(QueryResultsSyntaxError::msg( - "'results' should contain a 'bindings' key", - ) - .into()) - } - } - } - if reader.read_next_event()? != JsonEvent::StartArray { - return Err(QueryResultsSyntaxError::msg( - "'bindings' should be an object", - ) - .into()); - } - if let Some(variables) = variables { - let mut mapping = BTreeMap::default(); - for (i, var) in variables.iter().enumerate() { - mapping.insert(var.as_str().to_owned(), i); - } - return Ok(Self::Solutions { - variables, - solutions: JsonSolutionsReader { - kind: JsonSolutionsReaderKind::Streaming { reader }, - mapping, - }, - }); - } - // We buffer all results before being able to read the header - let mut bindings = Vec::new(); - let mut variables = Vec::new(); - let mut values = Vec::new(); - loop { - match reader.read_next_event()? { - JsonEvent::StartObject => (), - JsonEvent::EndObject => { - bindings.push((take(&mut variables), take(&mut values))); - } - JsonEvent::EndArray | JsonEvent::Eof => { - buffered_bindings = Some(bindings); - break; - } - JsonEvent::ObjectKey(key) => { - variables.push(key.into_owned()); - values.push(read_value(&mut reader, 0)?); - } - _ => { - return Err(QueryResultsSyntaxError::msg( - "Invalid result serialization", - ) - .into()) - } - } - } + self.state = JsonInnerReaderState::BeforeResults; + Ok(None) } "boolean" => { - return if let JsonEvent::Boolean(v) = reader.read_next_event()? { - Ok(Self::Boolean(v)) - } else { - Err(QueryResultsSyntaxError::msg("Unexpected boolean value").into()) - } + self.state = JsonInnerReaderState::BeforeBoolean; + Ok(None) } _ => { - return Err(QueryResultsSyntaxError::msg(format!( - "Expecting head or result key, found {key}" - )) - .into()); + self.state = JsonInnerReaderState::Ignore { + level: 0, + after: JsonInnerReaderStateAfterIgnore::InRootObject, + }; + Ok(None) + } + }, + JsonEvent::EndObject => Err(QueryResultsSyntaxError::msg( + "SPARQL JSON results must contain a 'boolean' or a 'results' key", + )), + _ => unreachable!(), + }, + JsonInnerReaderState::BeforeHead => { + if event == JsonEvent::StartObject { + self.state = JsonInnerReaderState::InHead; + Ok(None) + } else { + Err(QueryResultsSyntaxError::msg( + "SPARQL JSON results head must be an object", + )) + } + } + JsonInnerReaderState::InHead => match event { + JsonEvent::ObjectKey(key) => match key.as_ref() { + "vars" => { + self.state = JsonInnerReaderState::BeforeVars; + self.vars_read = true; + Ok(None) + } + "links" => { + self.state = JsonInnerReaderState::BeforeLinks; + Ok(None) + } + _ => { + self.state = JsonInnerReaderState::Ignore { + level: 0, + after: JsonInnerReaderStateAfterIgnore::InHead, + }; + Ok(None) + } + }, + JsonEvent::EndObject => { + self.state = JsonInnerReaderState::InRootObject; + Ok(None) + } + _ => unreachable!(), + }, + JsonInnerReaderState::BeforeVars => { + if event == JsonEvent::StartArray { + self.state = JsonInnerReaderState::InVars; + Ok(None) + } else { + Err(QueryResultsSyntaxError::msg( + "SPARQL JSON results vars must be an array", + )) + } + } + JsonInnerReaderState::InVars => match event { + JsonEvent::String(variable) => match Variable::new(variable.clone()) { + Ok(var) => { + if self.variables.contains(&var) { + return Err(QueryResultsSyntaxError::msg(format!( + "The variable {var} is declared twice" + ))); + } + self.variables.push(var); + Ok(None) } + Err(e) => Err(QueryResultsSyntaxError::msg(format!( + "Invalid variable name '{variable}': {e}" + ))), }, - JsonEvent::EndObject => (), - JsonEvent::Eof => { - return if let Some(output_iter) = output_iter { - Ok(output_iter) + JsonEvent::EndArray => { + if self.solutions_read { + let mut mapping = BTreeMap::default(); + for (i, var) in self.variables.iter().enumerate() { + mapping.insert(var.as_str().to_owned(), i); + } + Ok(Some(JsonInnerQueryResults::Solutions { + variables: take(&mut self.variables), + solutions: JsonInnerSolutions::Iterator( + JsonBufferedSolutionsIterator { + mapping, + bindings: take(&mut self.solutions).into_iter(), + }, + ), + })) } else { - Err(QueryResultsSyntaxError::msg( - "Unexpected end of JSON object without 'results' or 'boolean' key", - ) - .into()) + self.state = JsonInnerReaderState::InHead; + Ok(None) } } - _ => { - return Err(QueryResultsSyntaxError::msg( - "Invalid SPARQL results serialization", - ) - .into()) + _ => Err(QueryResultsSyntaxError::msg( + "Variables name in the vars array must be strings", + )), + }, + JsonInnerReaderState::BeforeLinks => { + if event == JsonEvent::StartArray { + self.state = JsonInnerReaderState::InLinks; + Ok(None) + } else { + Err(QueryResultsSyntaxError::msg( + "SPARQL JSON results links must be an array", + )) + } + } + JsonInnerReaderState::InLinks => match event { + JsonEvent::String(_) => Ok(None), + JsonEvent::EndArray => { + self.state = JsonInnerReaderState::InHead; + Ok(None) + } + _ => Err(QueryResultsSyntaxError::msg( + "Links in the links array must be strings", + )), + }, + JsonInnerReaderState::BeforeResults => { + if event == JsonEvent::StartObject { + self.state = JsonInnerReaderState::InResults; + Ok(None) + } else { + Err(QueryResultsSyntaxError::msg( + "SPARQL JSON results result must be an object", + )) + } + } + JsonInnerReaderState::InResults => match event { + JsonEvent::ObjectKey(key) => { + if key == "bindings" { + self.state = JsonInnerReaderState::BeforeBindings; + Ok(None) + } else { + self.state = JsonInnerReaderState::Ignore { + level: 0, + after: JsonInnerReaderStateAfterIgnore::InResults, + }; + Ok(None) + } + } + JsonEvent::EndObject => Err(QueryResultsSyntaxError::msg( + "The results object must contains a 'bindings' key", + )), + _ => unreachable!(), + }, + JsonInnerReaderState::BeforeBindings => { + if event == JsonEvent::StartArray { + self.solutions_read = true; + if self.vars_read { + let mut mapping = BTreeMap::default(); + for (i, var) in self.variables.iter().enumerate() { + mapping.insert(var.as_str().to_owned(), i); + } + Ok(Some(JsonInnerQueryResults::Solutions { + variables: take(&mut self.variables), + solutions: JsonInnerSolutions::Reader(JsonInnerSolutionsReader { + state: JsonInnerSolutionsReaderState::BeforeSolution, + mapping, + new_bindings: Vec::new(), + }), + })) + } else { + self.state = JsonInnerReaderState::BeforeSolution; + Ok(None) + } + } else { + Err(QueryResultsSyntaxError::msg( + "SPARQL JSON results bindings must be an array", + )) + } + } + JsonInnerReaderState::BeforeSolution => match event { + JsonEvent::StartObject => { + self.state = JsonInnerReaderState::BetweenSolutionTerms; + Ok(None) + } + JsonEvent::EndArray => { + self.state = JsonInnerReaderState::AfterBindings; + Ok(None) + } + _ => Err(QueryResultsSyntaxError::msg( + "Expecting a new solution object", + )), + }, + JsonInnerReaderState::BetweenSolutionTerms => match event { + JsonEvent::ObjectKey(key) => { + self.state = JsonInnerReaderState::Term { + reader: JsonInnerTermReader::default(), + variable: key.into(), + }; + Ok(None) + } + JsonEvent::EndObject => { + self.state = JsonInnerReaderState::BeforeSolution; + self.solutions.push(( + take(&mut self.current_solution_variables), + take(&mut self.current_solution_values), + )); + Ok(None) + } + _ => unreachable!(), + }, + JsonInnerReaderState::Term { + ref mut reader, + variable, + } => { + let result = reader.read_event(event); + if let Some(term) = result? { + self.current_solution_variables.push(take(variable)); + self.current_solution_values.push(term); + self.state = JsonInnerReaderState::BetweenSolutionTerms; } + Ok(None) + } + JsonInnerReaderState::AfterBindings => { + if event == JsonEvent::EndObject { + self.state = JsonInnerReaderState::InRootObject; + } else { + self.state = JsonInnerReaderState::Ignore { + level: 0, + after: JsonInnerReaderStateAfterIgnore::AfterBindings, + } + } + Ok(None) + } + JsonInnerReaderState::BeforeBoolean => { + if let JsonEvent::Boolean(v) = event { + Ok(Some(JsonInnerQueryResults::Boolean(v))) + } else { + Err(QueryResultsSyntaxError::msg("Unexpected boolean value")) + } + } + #[allow(clippy::ref_patterns)] + JsonInnerReaderState::Ignore { level, ref after } => { + let level = match event { + JsonEvent::StartArray | JsonEvent::StartObject => *level + 1, + JsonEvent::EndArray | JsonEvent::EndObject => *level - 1, + JsonEvent::String(_) + | JsonEvent::Number(_) + | JsonEvent::Boolean(_) + | JsonEvent::Null + | JsonEvent::ObjectKey(_) + | JsonEvent::Eof => *level, + }; + self.state = if level == 0 { + match after { + JsonInnerReaderStateAfterIgnore::InRootObject => { + JsonInnerReaderState::InRootObject + } + JsonInnerReaderStateAfterIgnore::InHead => JsonInnerReaderState::InHead, + JsonInnerReaderStateAfterIgnore::InResults => { + JsonInnerReaderState::InResults + } + JsonInnerReaderStateAfterIgnore::AfterBindings => { + JsonInnerReaderState::AfterBindings + } + } + } else { + JsonInnerReaderState::Ignore { + level, + after: *after, + } + }; + Ok(None) } } } } -pub struct JsonSolutionsReader { +struct JsonInnerSolutionsReader { + state: JsonInnerSolutionsReaderState, mapping: BTreeMap, - kind: JsonSolutionsReaderKind, + new_bindings: Vec>, } -enum JsonSolutionsReaderKind { - Streaming { - reader: FromReadJsonReader, - }, - Buffered { - bindings: std::vec::IntoIter<(Vec, Vec)>, +enum JsonInnerSolutionsReaderState { + BeforeSolution, + BetweenSolutionTerms, + Term { + reader: JsonInnerTermReader, + key: usize, }, + AfterEnd, } -impl JsonSolutionsReader { - pub fn read_next(&mut self) -> Result>>, QueryResultsParseError> { - match &mut self.kind { - JsonSolutionsReaderKind::Streaming { reader } => { - let mut new_bindings = vec![None; self.mapping.len()]; - loop { - match reader.read_next_event()? { - JsonEvent::StartObject => (), - JsonEvent::EndObject => return Ok(Some(new_bindings)), - JsonEvent::EndArray | JsonEvent::Eof => return Ok(None), - JsonEvent::ObjectKey(key) => { - let k = *self.mapping.get(key.as_ref()).ok_or_else(|| { - QueryResultsSyntaxError::msg(format!( - "The variable {key} has not been defined in the header" - )) - })?; - new_bindings[k] = Some(read_value(reader, 0)?) - } - _ => { - return Err(QueryResultsSyntaxError::msg( - "Invalid result serialization", - ) - .into()) - } - } +impl JsonInnerSolutionsReader { + fn read_event( + &mut self, + event: JsonEvent<'_>, + ) -> Result>>, QueryResultsSyntaxError> { + match &mut self.state { + JsonInnerSolutionsReaderState::BeforeSolution => match event { + JsonEvent::StartObject => { + self.state = JsonInnerSolutionsReaderState::BetweenSolutionTerms; + self.new_bindings = vec![None; self.mapping.len()]; + Ok(None) + } + JsonEvent::EndArray => { + self.state = JsonInnerSolutionsReaderState::AfterEnd; + Ok(None) } + _ => Err(QueryResultsSyntaxError::msg( + "Expecting a new solution object", + )), + }, + JsonInnerSolutionsReaderState::BetweenSolutionTerms => match event { + JsonEvent::ObjectKey(key) => { + let key = *self.mapping.get(key.as_ref()).ok_or_else(|| { + QueryResultsSyntaxError::msg(format!( + "The variable {key} has not been defined in the header" + )) + })?; + self.state = JsonInnerSolutionsReaderState::Term { + reader: JsonInnerTermReader::default(), + key, + }; + Ok(None) + } + JsonEvent::EndObject => { + self.state = JsonInnerSolutionsReaderState::BeforeSolution; + Ok(Some(take(&mut self.new_bindings))) + } + _ => unreachable!(), + }, + JsonInnerSolutionsReaderState::Term { + ref mut reader, + key, + } => { + let result = reader.read_event(event); + if let Some(term) = result? { + self.new_bindings[*key] = Some(term); + self.state = JsonInnerSolutionsReaderState::BetweenSolutionTerms; + } + Ok(None) } - JsonSolutionsReaderKind::Buffered { bindings } => { - Ok(if let Some((variables, values)) = bindings.next() { - let mut new_bindings = vec![None; self.mapping.len()]; - for (variable, value) in variables.into_iter().zip(values) { - let k = *self.mapping.get(&variable).ok_or_else(|| { - QueryResultsSyntaxError::msg(format!( - "The variable {variable} has not been defined in the header" - )) - })?; - new_bindings[k] = Some(value) - } - Some(new_bindings) + JsonInnerSolutionsReaderState::AfterEnd => { + if event == JsonEvent::EndObject { + Ok(None) } else { - None - }) + Err(QueryResultsSyntaxError::msg( + "Unexpected JSON after the end of the bindings array", + )) + } } } } } -fn read_value( - reader: &mut FromReadJsonReader, - number_of_recursive_calls: usize, -) -> Result { - enum Type { - Uri, - BNode, - Literal, - #[cfg(feature = "rdf-star")] - Triple, - } - #[derive(Eq, PartialEq)] - enum State { - Type, - Value, - Lang, - Datatype, - } +#[derive(Default)] +struct JsonInnerTermReader { + state: JsonInnerTermReaderState, + term_type: Option, + value: Option, + lang: Option, + datatype: Option, + #[cfg(feature = "rdf-star")] + subject: Option, + #[cfg(feature = "rdf-star")] + predicate: Option, + #[cfg(feature = "rdf-star")] + object: Option, +} - if number_of_recursive_calls == MAX_NUMBER_OF_NESTED_TRIPLES { - return Err(QueryResultsSyntaxError::msg(format!( - "Too many nested triples ({MAX_NUMBER_OF_NESTED_TRIPLES}). The parser fails here to avoid a stack overflow." - )) - .into()); - } - let mut state = None; - let mut t = None; - let mut value = None; - let mut lang = None; - let mut datatype = None; +#[derive(Default)] +enum JsonInnerTermReaderState { + #[default] + Start, + Middle, + TermType, + Value, + Lang, + Datatype, #[cfg(feature = "rdf-star")] - let mut subject = None; + InValue, #[cfg(feature = "rdf-star")] - let mut predicate = None; + Subject(Box), #[cfg(feature = "rdf-star")] - let mut object = None; - if reader.read_next_event()? != JsonEvent::StartObject { - return Err(QueryResultsSyntaxError::msg("Term serializations should be an object").into()); - } - loop { - #[allow(unsafe_code)] - // SAFETY: Borrow checker workaround https://github.com/rust-lang/rust/issues/70255 - let next_event = unsafe { - let r: *mut FromReadJsonReader = reader; - &mut *r - } - .read_next_event()?; - match next_event { - JsonEvent::ObjectKey(key) => match key.as_ref() { - "type" => state = Some(State::Type), - "value" => state = Some(State::Value), - "xml:lang" => state = Some(State::Lang), - "datatype" => state = Some(State::Datatype), - #[cfg(feature = "rdf-star")] - "subject" => subject = Some(read_value(reader, number_of_recursive_calls + 1)?), - #[cfg(feature = "rdf-star")] - "predicate" => predicate = Some(read_value(reader, number_of_recursive_calls + 1)?), - #[cfg(feature = "rdf-star")] - "object" => object = Some(read_value(reader, number_of_recursive_calls + 1)?), - _ => { - return Err(QueryResultsSyntaxError::msg(format!( - "Unexpected key in term serialization: '{key}'" + Predicate(Box), + #[cfg(feature = "rdf-star")] + Object(Box), +} + +enum TermType { + Uri, + BNode, + Literal, + #[cfg(feature = "rdf-star")] + Triple, +} + +impl JsonInnerTermReader { + fn read_event( + &mut self, + event: JsonEvent<'_>, + ) -> Result, QueryResultsSyntaxError> { + match &mut self.state { + JsonInnerTermReaderState::Start => { + if event == JsonEvent::StartObject { + self.state = JsonInnerTermReaderState::Middle; + Ok(None) + } else { + Err(QueryResultsSyntaxError::msg( + "RDF terms must be encoded using objects", )) - .into()) - } - }, - JsonEvent::StartObject => { - if state != Some(State::Value) { - return Err(QueryResultsSyntaxError::msg( - "Unexpected nested object in term serialization", - ) - .into()); } } - JsonEvent::String(s) => match state { - Some(State::Type) => { - match s.as_ref() { - "uri" => t = Some(Type::Uri), - "bnode" => t = Some(Type::BNode), - "literal" | "typed-literal" => t = Some(Type::Literal), - #[cfg(feature = "rdf-star")] - "triple" => t = Some(Type::Triple), + JsonInnerTermReaderState::Middle => match event { + JsonEvent::ObjectKey(object_key) => { + self.state = match object_key.as_ref() { + "type" => JsonInnerTermReaderState::TermType, + "value" => JsonInnerTermReaderState::Value, + "datatype" => JsonInnerTermReaderState::Datatype, + "xml:lang" => JsonInnerTermReaderState::Lang, _ => { return Err(QueryResultsSyntaxError::msg(format!( - "Unexpected term type: '{s}'" - )) - .into()) + "Unsupported term key: {object_key}" + ))); } }; - state = None; - } - Some(State::Value) => { - value = Some(s.into_owned()); - state = None; + Ok(None) } - Some(State::Lang) => { - lang = Some(s.into_owned()); - state = None; - } - Some(State::Datatype) => { - datatype = Some(NamedNode::new(s).map_err(|e| { - QueryResultsSyntaxError::msg(format!("Invalid datatype IRI: {e}")) - })?); - state = None; - } - _ => (), // impossible - }, - JsonEvent::EndObject => { - if let Some(s) = state { - if s == State::Value { - state = None; // End of triple - } else { - return Err(QueryResultsSyntaxError::msg( - "Term description values should be string", - ) - .into()); - } - } else { - return match t { + JsonEvent::EndObject => { + self.state = JsonInnerTermReaderState::Start; + match self.term_type.take() { None => Err(QueryResultsSyntaxError::msg( "Term serialization should have a 'type' key", - ) - .into()), - Some(Type::Uri) => Ok(NamedNode::new(value.ok_or_else(|| { - QueryResultsSyntaxError::msg( - "uri serialization should have a 'value' key", - ) - })?) - .map_err(|e| { - QueryResultsSyntaxError::msg(format!("Invalid uri value: {e}")) - })? - .into()), - Some(Type::BNode) => Ok(BlankNode::new(value.ok_or_else(|| { - QueryResultsSyntaxError::msg( - "bnode serialization should have a 'value' key", - ) - })?) - .map_err(|e| { - QueryResultsSyntaxError::msg(format!("Invalid bnode value: {e}")) - })? - .into()), - Some(Type::Literal) => { - let value = value.ok_or_else(|| { + )), + Some(TermType::Uri) => Ok(Some( + NamedNode::new(self.value.take().ok_or_else(|| { + QueryResultsSyntaxError::msg( + "uri serialization should have a 'value' key", + ) + })?) + .map_err(|e| { + QueryResultsSyntaxError::msg(format!("Invalid uri value: {e}")) + })? + .into(), + )), + Some(TermType::BNode) => Ok(Some( + BlankNode::new(self.value.take().ok_or_else(|| { + QueryResultsSyntaxError::msg( + "bnode serialization should have a 'value' key", + ) + })?) + .map_err(|e| { + QueryResultsSyntaxError::msg(format!("Invalid bnode value: {e}")) + })? + .into(), + )), + Some(TermType::Literal) => { + let value = self.value.take().ok_or_else(|| { QueryResultsSyntaxError::msg( "literal serialization should have a 'value' key", ) })?; - Ok(match lang { - Some(lang) => { - if let Some(datatype) = datatype { - if datatype.as_ref() != rdf::LANG_STRING { - return Err(QueryResultsSyntaxError::msg(format!( - "xml:lang value '{lang}' provided with the datatype {datatype}" - )).into()) + Ok(Some(match self.lang.take() { + Some(lang) => { + if let Some(datatype) = &self.datatype { + if datatype.as_ref() != rdf::LANG_STRING { + return Err(QueryResultsSyntaxError::msg(format!( + "xml:lang value '{lang}' provided with the datatype {datatype}" + ))); + } } + Literal::new_language_tagged_literal(value, &*lang) + .map_err(|e| { + QueryResultsSyntaxError::msg(format!( + "Invalid xml:lang value '{lang}': {e}" + )) + })? } - Literal::new_language_tagged_literal(value, &*lang).map_err(|e| { - QueryResultsSyntaxError::msg(format!("Invalid xml:lang value '{lang}': {e}")) - })? - } - None => if let Some(datatype) = datatype { - Literal::new_typed_literal(value, datatype) - } else { - Literal::new_simple_literal(value) - } - } - .into()) + None => { + if let Some(datatype) = self.datatype.take() { + Literal::new_typed_literal(value, datatype) + } else { + Literal::new_simple_literal(value) + } + } + }.into())) } #[cfg(feature = "rdf-star")] - Some(Type::Triple) => Ok(Triple::new( - match subject.ok_or_else(|| { - QueryResultsSyntaxError::msg( - "triple serialization should have a 'subject' key", - ) - })? { - Term::NamedNode(subject) => subject.into(), - Term::BlankNode(subject) => subject.into(), - Term::Triple(subject) => Subject::Triple(subject), - Term::Literal(_) => { - return Err(QueryResultsSyntaxError::msg( - "The 'subject' value should not be a literal", + Some(TermType::Triple) => Ok(Some( + Triple::new( + match self.subject.take().ok_or_else(|| { + QueryResultsSyntaxError::msg( + "triple serialization should have a 'subject' key", ) - .into()) - } - }, - match predicate.ok_or_else(|| { - QueryResultsSyntaxError::msg( - "triple serialization should have a 'predicate' key", - ) - })? { - Term::NamedNode(predicate) => predicate, - _ => { - return Err(QueryResultsSyntaxError::msg( - "The 'predicate' value should be a uri", + })? { + Term::NamedNode(subject) => subject.into(), + Term::BlankNode(subject) => subject.into(), + Term::Triple(subject) => Subject::Triple(subject), + Term::Literal(_) => { + return Err(QueryResultsSyntaxError::msg( + "The 'subject' value should not be a literal", + )); + } + }, + match self.predicate.take().ok_or_else(|| { + QueryResultsSyntaxError::msg( + "triple serialization should have a 'predicate' key", ) - .into()) - } - }, - object.ok_or_else(|| { - QueryResultsSyntaxError::msg( - "triple serialization should have a 'object' key", - ) - })?, - ) - .into()), - }; - } - } - _ => return Err(QueryResultsSyntaxError::msg("Invalid term serialization").into()), - } - } -} - -fn read_head( - reader: &mut FromReadJsonReader, -) -> Result, QueryResultsParseError> { - if reader.read_next_event()? != JsonEvent::StartObject { - return Err(QueryResultsSyntaxError::msg("head should be an object").into()); - } - let mut variables = Vec::new(); - loop { - match reader.read_next_event()? { - JsonEvent::ObjectKey(key) => match key.as_ref() { - "vars" => { - if reader.read_next_event()? != JsonEvent::StartArray { - return Err(QueryResultsSyntaxError::msg( - "Variable list should be an array", - ) - .into()); + })? { + Term::NamedNode(predicate) => predicate, + _ => { + return Err(QueryResultsSyntaxError::msg( + "The 'predicate' value should be a uri", + )); + } + }, + self.object.take().ok_or_else(|| { + QueryResultsSyntaxError::msg( + "triple serialization should have a 'object' key", + ) + })?, + ) + .into(), + )), } - loop { - match reader.read_next_event()? { - JsonEvent::String(s) => { - let new_var = Variable::new(s.as_ref()).map_err(|e| { - QueryResultsSyntaxError::msg(format!( - "Invalid variable declaration '{s}': {e}" - )) - })?; - if variables.contains(&new_var) { - return Err(QueryResultsSyntaxError::msg(format!( - "The variable {new_var} is declared twice" - )) - .into()); - } - variables.push(new_var); - } - JsonEvent::EndArray => break, - _ => { - return Err(QueryResultsSyntaxError::msg( - "Variable names should be strings", - ) - .into()) - } + } + _ => unreachable!(), + }, + JsonInnerTermReaderState::TermType => { + self.state = JsonInnerTermReaderState::Middle; + if let JsonEvent::String(value) = event { + match value.as_ref() { + "uri" => { + self.term_type = Some(TermType::Uri); + Ok(None) + } + "bnode" => { + self.term_type = Some(TermType::BNode); + Ok(None) } + "literal" | "typed-literal" => { + self.term_type = Some(TermType::Literal); + Ok(None) + } + #[cfg(feature = "rdf-star")] + "triple" => { + self.term_type = Some(TermType::Triple); + Ok(None) + } + _ => Err(QueryResultsSyntaxError::msg(format!( + "Unexpected term type: '{value}'" + ))), } + } else { + Err(QueryResultsSyntaxError::msg("Term type must be a string")) } - "link" => { - if reader.read_next_event()? != JsonEvent::StartArray { - return Err(QueryResultsSyntaxError::msg( - "Variable list should be an array", - ) - .into()); - } - loop { - match reader.read_next_event()? { - JsonEvent::String(_) => (), - JsonEvent::EndArray => break, - _ => { - return Err(QueryResultsSyntaxError::msg( - "Link names should be strings", - ) - .into()) - } + } + JsonInnerTermReaderState::Value => match event { + JsonEvent::String(value) => { + self.value = Some(value.into_owned()); + self.state = JsonInnerTermReaderState::Middle; + Ok(None) + } + #[cfg(feature = "rdf-star")] + JsonEvent::StartObject => { + self.state = JsonInnerTermReaderState::InValue; + Ok(None) + } + _ => { + self.state = JsonInnerTermReaderState::Middle; + + Err(QueryResultsSyntaxError::msg("Term value must be a string")) + } + }, + JsonInnerTermReaderState::Lang => { + let result = if let JsonEvent::String(value) = event { + self.lang = Some(value.into_owned()); + Ok(None) + } else { + Err(QueryResultsSyntaxError::msg("Term lang must be strings")) + }; + self.state = JsonInnerTermReaderState::Middle; + + result + } + JsonInnerTermReaderState::Datatype => { + let result = if let JsonEvent::String(value) = event { + match NamedNode::new(value) { + Ok(datatype) => { + self.datatype = Some(datatype); + Ok(None) } + Err(e) => Err(QueryResultsSyntaxError::msg(format!( + "Invalid datatype: {e}" + ))), } + } else { + Err(QueryResultsSyntaxError::msg("Term lang must be strings")) + }; + self.state = JsonInnerTermReaderState::Middle; + + result + } + #[cfg(feature = "rdf-star")] + JsonInnerTermReaderState::InValue => match event { + JsonEvent::ObjectKey(object_key) => { + self.state = match object_key.as_ref() { + "subject" => JsonInnerTermReaderState::Subject(Box::default()), + "predicate" => JsonInnerTermReaderState::Predicate(Box::default()), + "object" => JsonInnerTermReaderState::Object(Box::default()), + _ => { + return Err(QueryResultsSyntaxError::msg(format!( + "Unsupported value key: {object_key}" + ))); + } + }; + Ok(None) + } + JsonEvent::EndObject => { + self.state = JsonInnerTermReaderState::Middle; + Ok(None) } - _ => ignore_value(reader)?, + _ => unreachable!(), }, - JsonEvent::EndObject => return Ok(variables), - _ => return Err(QueryResultsSyntaxError::msg("Invalid head serialization").into()), - } - } -} - -fn ignore_value(reader: &mut FromReadJsonReader) -> Result<(), QueryResultsParseError> { - let mut nesting = 0; - loop { - match reader.read_next_event()? { - JsonEvent::Boolean(_) - | JsonEvent::Null - | JsonEvent::Number(_) - | JsonEvent::String(_) => { - if nesting == 0 { - return Ok(()); + #[cfg(feature = "rdf-star")] + JsonInnerTermReaderState::Subject(ref mut inner_state) => { + if let Some(term) = inner_state.read_event(event)? { + self.state = JsonInnerTermReaderState::InValue; + self.subject = Some(term); } + Ok(None) } - JsonEvent::ObjectKey(_) => (), - JsonEvent::StartArray | JsonEvent::StartObject => nesting += 1, - JsonEvent::EndArray | JsonEvent::EndObject => { - nesting -= 1; - if nesting == 0 { - return Ok(()); + #[cfg(feature = "rdf-star")] + JsonInnerTermReaderState::Predicate(ref mut inner_state) => { + if let Some(term) = inner_state.read_event(event)? { + self.state = JsonInnerTermReaderState::InValue; + self.predicate = Some(term); } + Ok(None) } - JsonEvent::Eof => { - return Err(QueryResultsSyntaxError::msg("Unexpected end of file").into()) + #[cfg(feature = "rdf-star")] + JsonInnerTermReaderState::Object(ref mut inner_state) => { + if let Some(term) = inner_state.read_event(event)? { + self.state = JsonInnerTermReaderState::InValue; + self.object = Some(term); + } + Ok(None) } } } } + +pub struct JsonBufferedSolutionsIterator { + mapping: BTreeMap, + bindings: std::vec::IntoIter<(Vec, Vec)>, +} + +impl JsonBufferedSolutionsIterator { + fn next(&mut self) -> Result>>, QueryResultsParseError> { + let Some((variables, values)) = self.bindings.next() else { + return Ok(None); + }; + let mut new_bindings = vec![None; self.mapping.len()]; + for (variable, value) in variables.into_iter().zip(values) { + let k = *self.mapping.get(&variable).ok_or_else(|| { + QueryResultsSyntaxError::msg(format!( + "The variable {variable} has not been defined in the header" + )) + })?; + new_bindings[k] = Some(value); + } + Ok(Some(new_bindings)) + } +} diff --git a/lib/sparesults/src/lib.rs b/lib/sparesults/src/lib.rs index ea3135c4..bfe45051 100644 --- a/lib/sparesults/src/lib.rs +++ b/lib/sparesults/src/lib.rs @@ -16,5 +16,9 @@ mod xml; pub use crate::error::{QueryResultsParseError, QueryResultsSyntaxError, TextPosition}; pub use crate::format::QueryResultsFormat; pub use crate::parser::{FromReadQueryResultsReader, FromReadSolutionsReader, QueryResultsParser}; +#[cfg(feature = "async-tokio")] +pub use crate::parser::{FromTokioAsyncReadQueryResultsReader, FromTokioAsyncReadSolutionsReader}; +#[cfg(feature = "async-tokio")] +pub use crate::serializer::ToTokioAsyncWriteSolutionsWriter; pub use crate::serializer::{QueryResultsSerializer, ToWriteSolutionsWriter}; pub use crate::solution::QuerySolution; diff --git a/lib/sparesults/src/parser.rs b/lib/sparesults/src/parser.rs index f95d9355..602a5e61 100644 --- a/lib/sparesults/src/parser.rs +++ b/lib/sparesults/src/parser.rs @@ -1,12 +1,18 @@ use crate::csv::{TsvQueryResultsReader, TsvSolutionsReader}; use crate::error::{QueryResultsParseError, QueryResultsSyntaxError}; use crate::format::QueryResultsFormat; -use crate::json::{JsonQueryResultsReader, JsonSolutionsReader}; +use crate::json::{FromReadJsonQueryResultsReader, FromReadJsonSolutionsReader}; +#[cfg(feature = "async-tokio")] +use crate::json::{ + FromTokioAsyncReadJsonQueryResultsReader, FromTokioAsyncReadJsonSolutionsReader, +}; use crate::solution::QuerySolution; use crate::xml::{XmlQueryResultsReader, XmlSolutionsReader}; use oxrdf::Variable; use std::io::Read; use std::sync::Arc; +#[cfg(feature = "async-tokio")] +use tokio::io::AsyncRead; /// Parsers for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats. /// @@ -45,24 +51,24 @@ impl QueryResultsParser { Self { format } } - /// Reads a result file. + /// Reads a result file from a [`Read`] implementation. /// - /// Reads are buffered. + /// Reads are automatically buffered. /// /// Example in XML (the API is the same for JSON and TSV): /// ``` /// use sparesults::{QueryResultsFormat, QueryResultsParser, FromReadQueryResultsReader}; /// use oxrdf::{Literal, Variable}; /// - /// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml); + /// let xml_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml); /// /// // boolean - /// if let FromReadQueryResultsReader::Boolean(v) = json_parser.parse_read(br#"true"#.as_slice())? { + /// if let FromReadQueryResultsReader::Boolean(v) = xml_parser.parse_read(br#"true"#.as_slice())? { /// assert_eq!(v, true); /// } /// /// // solutions - /// if let FromReadQueryResultsReader::Solutions(solutions) = json_parser.parse_read(br#"test"#.as_slice())? { + /// if let FromReadQueryResultsReader::Solutions(solutions) = xml_parser.parse_read(br#"test"#.as_slice())? { /// assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]); /// for solution in solutions { /// assert_eq!(solution?.iter().collect::>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]); @@ -82,17 +88,17 @@ impl QueryResultsParser { variables, } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { variables: variables.into(), - solutions: SolutionsReaderKind::Xml(solutions), + solutions: FromReadSolutionsReaderKind::Xml(solutions), }), }, - QueryResultsFormat::Json => match JsonQueryResultsReader::read(reader)? { - JsonQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r), - JsonQueryResultsReader::Solutions { + QueryResultsFormat::Json => match FromReadJsonQueryResultsReader::read(reader)? { + FromReadJsonQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r), + FromReadJsonQueryResultsReader::Solutions { solutions, variables, } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { variables: variables.into(), - solutions: SolutionsReaderKind::Json(solutions), + solutions: FromReadSolutionsReaderKind::Json(solutions), }), }, QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()), @@ -103,7 +109,7 @@ impl QueryResultsParser { variables, } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { variables: variables.into(), - solutions: SolutionsReaderKind::Tsv(solutions), + solutions: FromReadSolutionsReaderKind::Tsv(solutions), }), }, }) @@ -116,6 +122,56 @@ impl QueryResultsParser { ) -> Result, QueryResultsParseError> { self.parse_read(reader) } + + /// Reads a result file from a Tokio [`AsyncRead`] implementation. + /// + /// Reads are automatically buffered. + /// + /// Example in XML (the API is the same for JSON and TSV): + /// ```no_run + /// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader}; + /// use oxrdf::{Literal, Variable}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> Result<(), sparesults::QueryResultsParseError> { + /// let xml_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml); + /// + /// // boolean + /// if let FromTokioAsyncReadQueryResultsReader::Boolean(v) = xml_parser.parse_tokio_async_read(br#"true"#.as_slice()).await? { + /// assert_eq!(v, true); + /// } + /// + /// // solutions + /// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = xml_parser.parse_tokio_async_read(br#"test"#.as_slice()).await? { + /// assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]); + /// while let Some(solution) = solutions.next().await { + /// assert_eq!(solution?.iter().collect::>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]); + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + #[cfg(feature = "async-tokio")] + pub async fn parse_tokio_async_read( + &self, + reader: R, + ) -> Result, QueryResultsParseError> { + Ok(match self.format { + QueryResultsFormat::Xml => return Err(QueryResultsSyntaxError::msg("The XML query results parser does not support Tokio AsyncRead yet").into()), + QueryResultsFormat::Json => match FromTokioAsyncReadJsonQueryResultsReader::read(reader).await? { + FromTokioAsyncReadJsonQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r), + FromTokioAsyncReadJsonQueryResultsReader::Solutions { + solutions, + variables, + } => FromTokioAsyncReadQueryResultsReader::Solutions(FromTokioAsyncReadSolutionsReader { + variables: variables.into(), + solutions: FromTokioAsyncReadSolutionsReaderKind::Json(solutions), + }), + }, + QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()), + QueryResultsFormat::Tsv => return Err(QueryResultsSyntaxError::msg("The TSV query results parser does not support Tokio AsyncRead yet").into()), + }) + } } impl From for QueryResultsParser { @@ -133,16 +189,16 @@ impl From for QueryResultsParser { /// use oxrdf::{Literal, Variable}; /// use sparesults::{FromReadQueryResultsReader, QueryResultsFormat, QueryResultsParser}; /// -/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv); +/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv); /// /// // boolean -/// if let FromReadQueryResultsReader::Boolean(v) = json_parser.parse_read(b"true".as_slice())? { +/// if let FromReadQueryResultsReader::Boolean(v) = tsv_parser.parse_read(b"true".as_slice())? { /// assert_eq!(v, true); /// } /// /// // solutions /// if let FromReadQueryResultsReader::Solutions(solutions) = -/// json_parser.parse_read(b"?foo\t?bar\n\"test\"\t".as_slice())? +/// tsv_parser.parse_read(b"?foo\t?bar\n\"test\"\t".as_slice())? /// { /// assert_eq!( /// solutions.variables(), @@ -188,12 +244,12 @@ pub enum FromReadQueryResultsReader { /// ``` pub struct FromReadSolutionsReader { variables: Arc<[Variable]>, - solutions: SolutionsReaderKind, + solutions: FromReadSolutionsReaderKind, } -enum SolutionsReaderKind { +enum FromReadSolutionsReaderKind { Xml(XmlSolutionsReader), - Json(JsonSolutionsReader), + Json(FromReadJsonSolutionsReader), Tsv(TsvSolutionsReader), } @@ -205,9 +261,9 @@ impl FromReadSolutionsReader { /// use oxrdf::Variable; /// use sparesults::{FromReadQueryResultsReader, QueryResultsFormat, QueryResultsParser}; /// - /// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv); + /// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv); /// if let FromReadQueryResultsReader::Solutions(solutions) = - /// json_parser.parse_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())? + /// tsv_parser.parse_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())? /// { /// assert_eq!( /// solutions.variables(), @@ -231,9 +287,141 @@ impl Iterator for FromReadSolutionsReader { fn next(&mut self) -> Option { Some( match &mut self.solutions { - SolutionsReaderKind::Xml(reader) => reader.read_next(), - SolutionsReaderKind::Json(reader) => reader.read_next(), - SolutionsReaderKind::Tsv(reader) => reader.read_next(), + FromReadSolutionsReaderKind::Xml(reader) => reader.read_next(), + FromReadSolutionsReaderKind::Json(reader) => reader.read_next(), + FromReadSolutionsReaderKind::Tsv(reader) => reader.read_next(), + } + .transpose()? + .map(|values| (Arc::clone(&self.variables), values).into()), + ) + } +} + +/// The reader for a given read of a results file. +/// +/// It is either a read boolean ([`bool`]) or a streaming reader of a set of solutions ([`FromReadSolutionsReader`]). +/// +/// Example in TSV (the API is the same for JSON and XML): +/// ```no_run +/// use oxrdf::{Literal, Variable}; +/// use sparesults::{ +/// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser, +/// }; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> { +/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv); +/// +/// // boolean +/// if let FromTokioAsyncReadQueryResultsReader::Boolean(v) = tsv_parser +/// .parse_tokio_async_read(b"true".as_slice()) +/// .await? +/// { +/// assert_eq!(v, true); +/// } +/// +/// // solutions +/// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = tsv_parser +/// .parse_tokio_async_read(b"?foo\t?bar\n\"test\"\t".as_slice()) +/// .await? +/// { +/// assert_eq!( +/// solutions.variables(), +/// &[ +/// Variable::new_unchecked("foo"), +/// Variable::new_unchecked("bar") +/// ] +/// ); +/// while let Some(solution) = solutions.next().await { +/// assert_eq!( +/// solution?.iter().collect::>(), +/// vec![( +/// &Variable::new_unchecked("foo"), +/// &Literal::from("test").into() +/// )] +/// ); +/// } +/// } +/// # Ok(()) +/// # } +/// ``` +#[cfg(feature = "async-tokio")] +pub enum FromTokioAsyncReadQueryResultsReader { + Solutions(FromTokioAsyncReadSolutionsReader), + Boolean(bool), +} + +/// A streaming reader of a set of [`QuerySolution`] solutions. +/// +/// It implements the [`Iterator`] API to iterate over the solutions. +/// +/// Example in JSON (the API is the same for XML and TSV): +/// ``` +/// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader}; +/// use oxrdf::{Literal, Variable}; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> { +/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Json); +/// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = json_parser.parse_tokio_async_read(br#"{"head":{"vars":["foo","bar"]},"results":{"bindings":[{"foo":{"type":"literal","value":"test"}}]}}"#.as_slice()).await? { +/// assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]); +/// while let Some(solution) = solutions.next().await { +/// assert_eq!(solution?.iter().collect::>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]); +/// } +/// } +/// # Ok(()) +/// # } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadSolutionsReader { + variables: Arc<[Variable]>, + solutions: FromTokioAsyncReadSolutionsReaderKind, +} + +#[cfg(feature = "async-tokio")] +enum FromTokioAsyncReadSolutionsReaderKind { + Json(FromTokioAsyncReadJsonSolutionsReader), +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadSolutionsReader { + /// Ordered list of the declared variables at the beginning of the results. + /// + /// Example in TSV (the API is the same for JSON and XML): + /// ```no_run + /// use oxrdf::Variable; + /// use sparesults::{ + /// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser, + /// }; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> Result<(), sparesults::QueryResultsParseError> { + /// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv); + /// if let FromTokioAsyncReadQueryResultsReader::Solutions(solutions) = tsv_parser + /// .parse_tokio_async_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice()) + /// .await? + /// { + /// assert_eq!( + /// solutions.variables(), + /// &[ + /// Variable::new_unchecked("foo"), + /// Variable::new_unchecked("bar") + /// ] + /// ); + /// } + /// # Ok(()) + /// # } + /// ``` + #[inline] + pub fn variables(&self) -> &[Variable] { + &self.variables + } + + /// Reads the next solution or returns `None` if the file is finished. + pub async fn next(&mut self) -> Option> { + Some( + match &mut self.solutions { + FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await, } .transpose()? .map(|values| (Arc::clone(&self.variables), values).into()),