diff --git a/Cargo.lock b/Cargo.lock index 85ad265f..fc64ec63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,6 +780,9 @@ name = "json-event-parser" version = "0.2.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b5ddd02379e99769e117ab30d21ad42dcec8ad3c12be77f9a34779e62d46346" +dependencies = [ + "tokio", +] [[package]] name = "kernel32-sys" @@ -1731,6 +1734,7 @@ dependencies = [ "memchr", "oxrdf", "quick-xml", + "tokio", ] [[package]] diff --git a/lib/sparesults/Cargo.toml b/lib/sparesults/Cargo.toml index 67d75cf4..fcc95b39 100644 --- a/lib/sparesults/Cargo.toml +++ b/lib/sparesults/Cargo.toml @@ -17,12 +17,17 @@ rust-version.workspace = true [features] default = [] rdf-star = ["oxrdf/rdf-star"] +async-tokio = ["dep:tokio", "quick-xml/async-tokio", "json-event-parser/async-tokio"] [dependencies] json-event-parser = "0.2.0-alpha.2" memchr = "2.5" oxrdf = { version = "0.2.0-alpha.1-dev", path="../oxrdf" } quick-xml = ">=0.29, <0.32" +tokio = { version = "1.29", optional = true, features = ["io-util"] } + +[dev-dependencies] +tokio = { version = "1.29", features = ["rt", "macros"] } [package.metadata.docs.rs] all-features = true diff --git a/lib/sparesults/src/csv.rs b/lib/sparesults/src/csv.rs index b0dc99f1..ac02c99e 100644 --- a/lib/sparesults/src/csv.rs +++ b/lib/sparesults/src/csv.rs @@ -6,38 +6,121 @@ use oxrdf::Variable; use oxrdf::{vocab::xsd, *}; use std::io::{self, Read, Write}; use std::str::{self, FromStr}; +#[cfg(feature = "async-tokio")] +use tokio::io::{AsyncWrite, AsyncWriteExt}; const MAX_BUFFER_SIZE: usize = 4096 * 4096; -pub fn write_boolean_csv_result(mut sink: W, value: bool) -> io::Result { - sink.write_all(if value { b"true" } else { b"false" })?; - Ok(sink) +pub fn write_boolean_csv_result(mut write: W, value: bool) -> io::Result { + write.write_all(if value { b"true" } else { b"false" })?; + Ok(write) } -pub struct CsvSolutionsWriter { - sink: W, +#[cfg(feature = "async-tokio")] +pub async fn tokio_async_write_boolean_csv_result( + mut write: W, + value: bool, +) -> io::Result { + write + .write_all(if value { b"true" } else { b"false" }) + .await?; + Ok(write) +} + +pub struct ToWriteCsvSolutionsWriter { + inner: InnerCsvSolutionsWriter, + write: W, + buffer: String, +} + +impl ToWriteCsvSolutionsWriter { + pub fn start(mut write: W, variables: Vec) -> io::Result { + let mut buffer = String::new(); + let inner = InnerCsvSolutionsWriter::start(&mut buffer, variables); + write.write_all(buffer.as_bytes())?; + buffer.clear(); + Ok(Self { + inner, + write, + buffer, + }) + } + + pub fn write<'a>( + &mut self, + solution: impl IntoIterator, TermRef<'a>)>, + ) -> io::Result<()> { + self.inner.write(&mut self.buffer, solution); + self.write.write_all(self.buffer.as_bytes())?; + self.buffer.clear(); + Ok(()) + } + + pub fn finish(self) -> W { + self.write + } +} + +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteCsvSolutionsWriter { + inner: InnerCsvSolutionsWriter, + write: W, + buffer: String, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteCsvSolutionsWriter { + pub async fn start(mut write: W, variables: Vec) -> io::Result { + let mut buffer = String::new(); + let inner = InnerCsvSolutionsWriter::start(&mut buffer, variables); + write.write_all(buffer.as_bytes()).await?; + buffer.clear(); + Ok(Self { + inner, + write, + buffer, + }) + } + + pub async fn write<'a>( + &mut self, + solution: impl IntoIterator, TermRef<'a>)>, + ) -> io::Result<()> { + self.inner.write(&mut self.buffer, solution); + self.write.write_all(self.buffer.as_bytes()).await?; + self.buffer.clear(); + Ok(()) + } + + pub fn finish(self) -> W { + self.write + } +} + +struct InnerCsvSolutionsWriter { variables: Vec, } -impl CsvSolutionsWriter { - pub fn start(mut sink: W, variables: Vec) -> io::Result { +impl InnerCsvSolutionsWriter { + fn start(output: &mut String, variables: Vec) -> Self { let mut start_vars = true; for variable in &variables { if start_vars { start_vars = false; } else { - sink.write_all(b",")?; + output.push(','); } - sink.write_all(variable.as_str().as_bytes())?; + output.push_str(variable.as_str()); } - sink.write_all(b"\r\n")?; - Ok(Self { sink, variables }) + output.push_str("\r\n"); + Self { variables } } - pub fn write<'a>( - &mut self, + fn write<'a>( + &self, + output: &mut String, solution: impl IntoIterator, TermRef<'a>)>, - ) -> io::Result<()> { + ) { let mut values = vec![None; self.variables.len()]; for (variable, value) in solution { if let Some(position) = self.variables.iter().position(|v| *v == variable) { @@ -49,85 +132,147 @@ impl CsvSolutionsWriter { if start_binding { start_binding = false; } else { - self.sink.write_all(b",")?; + output.push(','); } if let Some(value) = value { - write_csv_term(value, &mut self.sink)?; + write_csv_term(output, value); } } - self.sink.write_all(b"\r\n") - } - - pub fn finish(self) -> W { - self.sink + output.push_str("\r\n"); } } -fn write_csv_term<'a>(term: impl Into>, sink: &mut impl Write) -> io::Result<()> { +fn write_csv_term<'a>(output: &mut String, term: impl Into>) { match term.into() { - TermRef::NamedNode(uri) => sink.write_all(uri.as_str().as_bytes()), + TermRef::NamedNode(uri) => output.push_str(uri.as_str()), TermRef::BlankNode(bnode) => { - sink.write_all(b"_:")?; - sink.write_all(bnode.as_str().as_bytes()) + output.push_str("_:"); + output.push_str(bnode.as_str()) } - TermRef::Literal(literal) => write_escaped_csv_string(literal.value(), sink), + TermRef::Literal(literal) => write_escaped_csv_string(output, literal.value()), #[cfg(feature = "rdf-star")] TermRef::Triple(triple) => { - write_csv_term(&triple.subject, sink)?; - sink.write_all(b" ")?; - write_csv_term(&triple.predicate, sink)?; - sink.write_all(b" ")?; - write_csv_term(&triple.object, sink) + write_csv_term(output, &triple.subject); + output.push(' '); + write_csv_term(output, &triple.predicate); + output.push(' '); + write_csv_term(output, &triple.object) } } } -fn write_escaped_csv_string(s: &str, sink: &mut impl Write) -> io::Result<()> { +fn write_escaped_csv_string(output: &mut String, s: &str) { if s.bytes().any(|c| matches!(c, b'"' | b',' | b'\n' | b'\r')) { - sink.write_all(b"\"")?; - for c in s.bytes() { - if c == b'\"' { - sink.write_all(b"\"\"") + output.push('"'); + for c in s.chars() { + if c == '"' { + output.push('"'); + output.push('"'); } else { - sink.write_all(&[c]) - }?; + output.push(c) + }; } - sink.write_all(b"\"") + output.push('"'); } else { - sink.write_all(s.as_bytes()) + output.push_str(s) } } -pub fn write_boolean_tsv_result(mut sink: W, value: bool) -> io::Result { - sink.write_all(if value { b"true" } else { b"false" })?; - Ok(sink) +pub struct ToWriteTsvSolutionsWriter { + inner: InnerTsvSolutionsWriter, + write: W, + buffer: String, } -pub struct TsvSolutionsWriter { - sink: W, +impl ToWriteTsvSolutionsWriter { + pub fn start(mut write: W, variables: Vec) -> io::Result { + let mut buffer = String::new(); + let inner = InnerTsvSolutionsWriter::start(&mut buffer, variables); + write.write_all(buffer.as_bytes())?; + buffer.clear(); + Ok(Self { + inner, + write, + buffer, + }) + } + + pub fn write<'a>( + &mut self, + solution: impl IntoIterator, TermRef<'a>)>, + ) -> io::Result<()> { + self.inner.write(&mut self.buffer, solution); + self.write.write_all(self.buffer.as_bytes())?; + self.buffer.clear(); + Ok(()) + } + + pub fn finish(self) -> W { + self.write + } +} + +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteTsvSolutionsWriter { + inner: InnerTsvSolutionsWriter, + write: W, + buffer: String, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteTsvSolutionsWriter { + pub async fn start(mut write: W, variables: Vec) -> io::Result { + let mut buffer = String::new(); + let inner = InnerTsvSolutionsWriter::start(&mut buffer, variables); + write.write_all(buffer.as_bytes()).await?; + buffer.clear(); + Ok(Self { + inner, + write, + buffer, + }) + } + + pub async fn write<'a>( + &mut self, + solution: impl IntoIterator, TermRef<'a>)>, + ) -> io::Result<()> { + self.inner.write(&mut self.buffer, solution); + self.write.write_all(self.buffer.as_bytes()).await?; + self.buffer.clear(); + Ok(()) + } + + pub fn finish(self) -> W { + self.write + } +} + +struct InnerTsvSolutionsWriter { variables: Vec, } -impl TsvSolutionsWriter { - pub fn start(mut sink: W, variables: Vec) -> io::Result { +impl InnerTsvSolutionsWriter { + fn start(output: &mut String, variables: Vec) -> Self { let mut start_vars = true; for variable in &variables { if start_vars { start_vars = false; } else { - sink.write_all(b"\t")?; + output.push('\t'); } - sink.write_all(b"?")?; - sink.write_all(variable.as_str().as_bytes())?; + output.push('?'); + output.push_str(variable.as_str()); } - sink.write_all(b"\n")?; - Ok(Self { sink, variables }) + output.push('\n'); + Self { variables } } - pub fn write<'a>( - &mut self, + fn write<'a>( + &self, + output: &mut String, solution: impl IntoIterator, TermRef<'a>)>, - ) -> io::Result<()> { + ) { let mut values = vec![None; self.variables.len()]; for (variable, value) in solution { if let Some(position) = self.variables.iter().position(|v| *v == variable) { @@ -139,70 +284,74 @@ impl TsvSolutionsWriter { if start_binding { start_binding = false; } else { - self.sink.write_all(b"\t")?; + output.push('\t'); } if let Some(value) = value { - write_tsv_term(value, &mut self.sink)?; + write_tsv_term(output, value); } } - self.sink.write_all(b"\n") - } - - pub fn finish(self) -> W { - self.sink + output.push('\n'); } } -fn write_tsv_term<'a>(term: impl Into>, sink: &mut impl Write) -> io::Result<()> { +fn write_tsv_term<'a>(output: &mut String, term: impl Into>) { match term.into() { - TermRef::NamedNode(node) => write!(sink, "<{}>", node.as_str()), - TermRef::BlankNode(node) => write!(sink, "_:{}", node.as_str()), + TermRef::NamedNode(node) => { + output.push('<'); + output.push_str(node.as_str()); + output.push('>'); + } + TermRef::BlankNode(node) => { + output.push_str("_:"); + output.push_str(node.as_str()); + } TermRef::Literal(literal) => { let value = literal.value(); if let Some(language) = literal.language() { - write_tsv_quoted_str(value, sink)?; - write!(sink, "@{language}") + write_tsv_quoted_str(output, value); + output.push('@'); + output.push_str(language); } else { match literal.datatype() { - xsd::BOOLEAN if is_turtle_boolean(value) => sink.write_all(value.as_bytes()), - xsd::INTEGER if is_turtle_integer(value) => sink.write_all(value.as_bytes()), - xsd::DECIMAL if is_turtle_decimal(value) => sink.write_all(value.as_bytes()), - xsd::DOUBLE if is_turtle_double(value) => sink.write_all(value.as_bytes()), - xsd::STRING => write_tsv_quoted_str(value, sink), + xsd::BOOLEAN if is_turtle_boolean(value) => output.push_str(value), + xsd::INTEGER if is_turtle_integer(value) => output.push_str(value), + xsd::DECIMAL if is_turtle_decimal(value) => output.push_str(value), + xsd::DOUBLE if is_turtle_double(value) => output.push_str(value), + xsd::STRING => write_tsv_quoted_str(output, value), datatype => { - write_tsv_quoted_str(value, sink)?; - write!(sink, "^^<{}>", datatype.as_str()) + write_tsv_quoted_str(output, value); + output.push_str("^^"); + write_tsv_term(output, datatype); } } } } #[cfg(feature = "rdf-star")] TermRef::Triple(triple) => { - sink.write_all(b"<< ")?; - write_tsv_term(&triple.subject, sink)?; - sink.write_all(b" ")?; - write_tsv_term(&triple.predicate, sink)?; - sink.write_all(b" ")?; - write_tsv_term(&triple.object, sink)?; - sink.write_all(b" >>")?; - Ok(()) + output.push_str("<< "); + write_tsv_term(output, &triple.subject); + output.push(' '); + write_tsv_term(output, &triple.predicate); + output.push(' '); + write_tsv_term(output, &triple.object); + output.push_str(" >>"); } } } -fn write_tsv_quoted_str(string: &str, f: &mut impl Write) -> io::Result<()> { - f.write_all(b"\"")?; - for c in string.bytes() { +fn write_tsv_quoted_str(output: &mut String, string: &str) { + output.push('"'); + for c in string.chars() { match c { - b'\t' => f.write_all(b"\\t"), - b'\n' => f.write_all(b"\\n"), - b'\r' => f.write_all(b"\\r"), - b'"' => f.write_all(b"\\\""), - b'\\' => f.write_all(b"\\\\"), - _ => f.write_all(&[c]), - }?; - } - f.write_all(b"\"") + '\t' => output.push_str("\\t"), + '\n' => output.push_str("\\n"), + '\r' => output.push_str("\\r"), + '"' => output.push_str("\\\""), + '\\' => output.push_str("\\\\"), + _ => output.push(c), + }; + } + output.push('"'); } fn is_turtle_boolean(value: &str) -> bool { @@ -439,7 +588,7 @@ impl LineReader { if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) { break self.buffer_start + eol + 1; } - if self.buffer_start > buffer.len() / 2 { + if self.buffer_start > 0 { buffer.copy_within(self.buffer_start..self.buffer_end, 0); self.buffer_end -= self.buffer_start; self.buffer_start = 0; @@ -480,7 +629,6 @@ mod tests { use super::*; use std::error::Error; use std::rc::Rc; - use std::str; fn build_example() -> (Vec, Vec>>) { ( @@ -530,21 +678,21 @@ mod tests { } #[test] - fn test_csv_serialization() -> io::Result<()> { + fn test_csv_serialization() { let (variables, solutions) = build_example(); - let mut writer = CsvSolutionsWriter::start(Vec::new(), variables.clone())?; + let mut buffer = String::new(); + let writer = InnerCsvSolutionsWriter::start(&mut buffer, variables.clone()); let variables = Rc::new(variables); for solution in solutions { writer.write( + &mut buffer, variables .iter() .zip(&solution) .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))), - )?; + ); } - let result = writer.finish(); - assert_eq!(str::from_utf8(&result).unwrap(), "x,literal\r\nhttp://example/x,String\r\nhttp://example/x,\"String-with-dquote\"\"\"\r\n_:b0,Blank node\r\n,Missing 'x'\r\n,\r\nhttp://example/x,\r\n_:b1,String-with-lang\r\n_:b1,123\r\n,\"escape,\t\r\n\"\r\n"); - Ok(()) + assert_eq!(buffer, "x,literal\r\nhttp://example/x,String\r\nhttp://example/x,\"String-with-dquote\"\"\"\r\n_:b0,Blank node\r\n,Missing 'x'\r\n,\r\nhttp://example/x,\r\n_:b1,String-with-lang\r\n_:b1,123\r\n,\"escape,\t\r\n\"\r\n"); } #[test] @@ -552,24 +700,25 @@ mod tests { let (variables, solutions) = build_example(); // Write - let mut writer = TsvSolutionsWriter::start(Vec::new(), variables.clone())?; + let mut buffer = String::new(); + let writer = InnerTsvSolutionsWriter::start(&mut buffer, variables.clone()); let variables = Rc::new(variables); for solution in &solutions { writer.write( + &mut buffer, variables .iter() .zip(solution) .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))), - )?; + ); } - let result = writer.finish(); - assert_eq!(str::from_utf8(&result).unwrap(), "?x\t?literal\n\t\"String\"\n\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n"); + assert_eq!(buffer, "?x\t?literal\n\t\"String\"\n\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n"); // Read if let TsvQueryResultsReader::Solutions { solutions: mut solutions_iter, variables: actual_variables, - } = TsvQueryResultsReader::read(result.as_slice())? + } = TsvQueryResultsReader::read(buffer.as_bytes())? { assert_eq!(actual_variables.as_slice(), variables.as_slice()); let mut rows = Vec::new(); @@ -610,21 +759,19 @@ mod tests { } #[test] - fn test_no_columns_csv_serialization() -> io::Result<()> { - let mut writer = CsvSolutionsWriter::start(Vec::new(), Vec::new())?; - writer.write([])?; - let result = writer.finish(); - assert_eq!(str::from_utf8(&result).unwrap(), "\r\n\r\n"); - Ok(()) + fn test_no_columns_csv_serialization() { + let mut buffer = String::new(); + let writer = InnerCsvSolutionsWriter::start(&mut buffer, Vec::new()); + writer.write(&mut buffer, []); + assert_eq!(buffer, "\r\n\r\n"); } #[test] - fn test_no_columns_tsv_serialization() -> io::Result<()> { - let mut writer = TsvSolutionsWriter::start(Vec::new(), Vec::new())?; - writer.write([])?; - let result = writer.finish(); - assert_eq!(str::from_utf8(&result).unwrap(), "\n\n"); - Ok(()) + fn test_no_columns_tsv_serialization() { + let mut buffer = String::new(); + let writer = InnerTsvSolutionsWriter::start(&mut buffer, Vec::new()); + writer.write(&mut buffer, []); + assert_eq!(buffer, "\n\n"); } #[test] @@ -644,19 +791,17 @@ mod tests { } #[test] - fn test_no_results_csv_serialization() -> io::Result<()> { - let result = - CsvSolutionsWriter::start(Vec::new(), vec![Variable::new_unchecked("a")])?.finish(); - assert_eq!(str::from_utf8(&result).unwrap(), "a\r\n"); - Ok(()) + fn test_no_results_csv_serialization() { + let mut buffer = String::new(); + InnerCsvSolutionsWriter::start(&mut buffer, vec![Variable::new_unchecked("a")]); + assert_eq!(buffer, "a\r\n"); } #[test] - fn test_no_results_tsv_serialization() -> io::Result<()> { - let result = - TsvSolutionsWriter::start(Vec::new(), vec![Variable::new_unchecked("a")])?.finish(); - assert_eq!(str::from_utf8(&result).unwrap(), "?a\n"); - Ok(()) + fn test_no_results_tsv_serialization() { + let mut buffer = String::new(); + InnerTsvSolutionsWriter::start(&mut buffer, vec![Variable::new_unchecked("a")]); + assert_eq!(buffer, "?a\n"); } #[test] diff --git a/lib/sparesults/src/json.rs b/lib/sparesults/src/json.rs index 3dc9be11..cbaa873c 100644 --- a/lib/sparesults/src/json.rs +++ b/lib/sparesults/src/json.rs @@ -1,6 +1,8 @@ //! Implementation of [SPARQL Query Results JSON Format](https://www.w3.org/TR/sparql11-results-json/) use crate::error::{ParseError, SyntaxError}; +#[cfg(feature = "async-tokio")] +use json_event_parser::ToTokioAsyncWriteJsonWriter; use json_event_parser::{FromReadJsonReader, JsonEvent, ToWriteJsonWriter}; use oxrdf::vocab::rdf; use oxrdf::Variable; @@ -8,6 +10,8 @@ use oxrdf::*; use std::collections::BTreeMap; use std::io::{self, Read, Write}; use std::mem::take; +#[cfg(feature = "async-tokio")] +use tokio::io::AsyncWrite; /// This limit is set in order to avoid stack overflow error when parsing nested triples due to too many recursive calls. /// The actual limit value is a wet finger compromise between not failing to parse valid files and avoiding to trigger stack overflow errors. @@ -15,116 +19,210 @@ const MAX_NUMBER_OF_NESTED_TRIPLES: usize = 128; pub fn write_boolean_json_result(write: W, value: bool) -> io::Result { let mut writer = ToWriteJsonWriter::new(write); - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("head".into()))?; - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::EndObject)?; - writer.write_event(JsonEvent::ObjectKey("boolean".into()))?; - writer.write_event(JsonEvent::Boolean(value))?; - writer.write_event(JsonEvent::EndObject)?; + for event in inner_write_boolean_json_result(value) { + writer.write_event(event)?; + } + writer.finish() +} + +#[cfg(feature = "async-tokio")] +pub async fn tokio_async_write_boolean_json_result( + write: W, + value: bool, +) -> io::Result { + let mut writer = ToTokioAsyncWriteJsonWriter::new(write); + for event in inner_write_boolean_json_result(value) { + writer.write_event(event).await?; + } writer.finish() } -pub struct JsonSolutionsWriter { +fn inner_write_boolean_json_result(value: bool) -> [JsonEvent<'static>; 7] { + [ + JsonEvent::StartObject, + JsonEvent::ObjectKey("head".into()), + JsonEvent::StartObject, + JsonEvent::EndObject, + JsonEvent::ObjectKey("boolean".into()), + JsonEvent::Boolean(value), + JsonEvent::EndObject, + ] +} + +pub struct ToWriteJsonSolutionsWriter { + inner: InnerJsonSolutionsWriter, writer: ToWriteJsonWriter, } -impl JsonSolutionsWriter { +impl ToWriteJsonSolutionsWriter { pub fn start(write: W, variables: &[Variable]) -> io::Result { let mut writer = ToWriteJsonWriter::new(write); - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("head".into()))?; - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("vars".into()))?; - writer.write_event(JsonEvent::StartArray)?; - for variable in variables { - writer.write_event(JsonEvent::String(variable.as_str().into()))?; - } - writer.write_event(JsonEvent::EndArray)?; - writer.write_event(JsonEvent::EndObject)?; - writer.write_event(JsonEvent::ObjectKey("results".into()))?; - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("bindings".into()))?; - writer.write_event(JsonEvent::StartArray)?; - Ok(Self { writer }) + let mut buffer = Vec::with_capacity(48); + let inner = InnerJsonSolutionsWriter::start(&mut buffer, variables); + Self::do_write(&mut writer, buffer)?; + Ok(Self { inner, writer }) } pub fn write<'a>( &mut self, solution: impl IntoIterator, TermRef<'a>)>, ) -> io::Result<()> { - self.writer.write_event(JsonEvent::StartObject)?; - for (variable, value) in solution { - self.writer - .write_event(JsonEvent::ObjectKey(variable.as_str().into()))?; - write_json_term(value, &mut self.writer)?; + let mut buffer = Vec::with_capacity(48); + self.inner.write(&mut buffer, solution); + Self::do_write(&mut self.writer, buffer) + } + + pub fn finish(mut self) -> io::Result { + let mut buffer = Vec::with_capacity(4); + self.inner.finish(&mut buffer); + Self::do_write(&mut self.writer, buffer)?; + self.writer.finish() + } + + fn do_write(writer: &mut ToWriteJsonWriter, output: Vec>) -> io::Result<()> { + for event in output { + writer.write_event(event)?; } - self.writer.write_event(JsonEvent::EndObject)?; Ok(()) } +} - pub fn finish(mut self) -> io::Result { - self.writer.write_event(JsonEvent::EndArray)?; - self.writer.write_event(JsonEvent::EndObject)?; - self.writer.write_event(JsonEvent::EndObject)?; +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteJsonSolutionsWriter { + inner: InnerJsonSolutionsWriter, + writer: ToTokioAsyncWriteJsonWriter, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteJsonSolutionsWriter { + pub async fn start(write: W, variables: &[Variable]) -> io::Result { + let mut writer = ToTokioAsyncWriteJsonWriter::new(write); + let mut buffer = Vec::with_capacity(48); + let inner = InnerJsonSolutionsWriter::start(&mut buffer, variables); + Self::do_write(&mut writer, buffer).await?; + Ok(Self { writer, inner }) + } + + pub async fn write<'a>( + &mut self, + solution: impl IntoIterator, TermRef<'a>)>, + ) -> io::Result<()> { + let mut buffer = Vec::with_capacity(48); + self.inner.write(&mut buffer, solution); + Self::do_write(&mut self.writer, buffer).await + } + + pub async fn finish(mut self) -> io::Result { + let mut buffer = Vec::with_capacity(4); + self.inner.finish(&mut buffer); + Self::do_write(&mut self.writer, buffer).await?; self.writer.finish() } + + async fn do_write( + writer: &mut ToTokioAsyncWriteJsonWriter, + output: Vec>, + ) -> io::Result<()> { + for event in output { + writer.write_event(event).await?; + } + Ok(()) + } +} + +struct InnerJsonSolutionsWriter; + +impl InnerJsonSolutionsWriter { + fn start<'a>(output: &mut Vec>, variables: &'a [Variable]) -> Self { + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("head".into())); + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("vars".into())); + output.push(JsonEvent::StartArray); + for variable in variables { + output.push(JsonEvent::String(variable.as_str().into())); + } + output.push(JsonEvent::EndArray); + output.push(JsonEvent::EndObject); + output.push(JsonEvent::ObjectKey("results".into())); + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("bindings".into())); + output.push(JsonEvent::StartArray); + Self {} + } + + #[allow(clippy::unused_self)] + fn write<'a>( + &self, + output: &mut Vec>, + solution: impl IntoIterator, TermRef<'a>)>, + ) { + output.push(JsonEvent::StartObject); + for (variable, value) in solution { + output.push(JsonEvent::ObjectKey(variable.as_str().into())); + write_json_term(output, value); + } + output.push(JsonEvent::EndObject); + } + + #[allow(clippy::unused_self)] + fn finish(self, output: &mut Vec>) { + output.push(JsonEvent::EndArray); + output.push(JsonEvent::EndObject); + output.push(JsonEvent::EndObject); + } } -fn write_json_term( - term: TermRef<'_>, - writer: &mut ToWriteJsonWriter, -) -> io::Result<()> { +fn write_json_term<'a>(output: &mut Vec>, term: TermRef<'a>) { match term { TermRef::NamedNode(uri) => { - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("type".into()))?; - writer.write_event(JsonEvent::String("uri".into()))?; - writer.write_event(JsonEvent::ObjectKey("value".into()))?; - writer.write_event(JsonEvent::String(uri.as_str().into()))?; - writer.write_event(JsonEvent::EndObject)?; + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("type".into())); + output.push(JsonEvent::String("uri".into())); + output.push(JsonEvent::ObjectKey("value".into())); + output.push(JsonEvent::String(uri.as_str().into())); + output.push(JsonEvent::EndObject); } TermRef::BlankNode(bnode) => { - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("type".into()))?; - writer.write_event(JsonEvent::String("bnode".into()))?; - writer.write_event(JsonEvent::ObjectKey("value".into()))?; - writer.write_event(JsonEvent::String(bnode.as_str().into()))?; - writer.write_event(JsonEvent::EndObject)?; + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("type".into())); + output.push(JsonEvent::String("bnode".into())); + output.push(JsonEvent::ObjectKey("value".into())); + output.push(JsonEvent::String(bnode.as_str().into())); + output.push(JsonEvent::EndObject); } TermRef::Literal(literal) => { - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("type".into()))?; - writer.write_event(JsonEvent::String("literal".into()))?; - writer.write_event(JsonEvent::ObjectKey("value".into()))?; - writer.write_event(JsonEvent::String(literal.value().into()))?; + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("type".into())); + output.push(JsonEvent::String("literal".into())); + output.push(JsonEvent::ObjectKey("value".into())); + output.push(JsonEvent::String(literal.value().into())); if let Some(language) = literal.language() { - writer.write_event(JsonEvent::ObjectKey("xml:lang".into()))?; - writer.write_event(JsonEvent::String(language.into()))?; + output.push(JsonEvent::ObjectKey("xml:lang".into())); + output.push(JsonEvent::String(language.into())); } else if !literal.is_plain() { - writer.write_event(JsonEvent::ObjectKey("datatype".into()))?; - writer.write_event(JsonEvent::String(literal.datatype().as_str().into()))?; + output.push(JsonEvent::ObjectKey("datatype".into())); + output.push(JsonEvent::String(literal.datatype().as_str().into())); } - writer.write_event(JsonEvent::EndObject)?; + output.push(JsonEvent::EndObject); } #[cfg(feature = "rdf-star")] TermRef::Triple(triple) => { - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("type".into()))?; - writer.write_event(JsonEvent::String("triple".into()))?; - writer.write_event(JsonEvent::ObjectKey("value".into()))?; - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("subject".into()))?; - write_json_term(triple.subject.as_ref().into(), writer)?; - writer.write_event(JsonEvent::ObjectKey("predicate".into()))?; - write_json_term(triple.predicate.as_ref().into(), writer)?; - writer.write_event(JsonEvent::ObjectKey("object".into()))?; - write_json_term(triple.object.as_ref(), writer)?; - writer.write_event(JsonEvent::EndObject)?; - writer.write_event(JsonEvent::EndObject)?; + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("type".into())); + output.push(JsonEvent::String("triple".into())); + output.push(JsonEvent::ObjectKey("value".into())); + output.push(JsonEvent::StartObject); + output.push(JsonEvent::ObjectKey("subject".into())); + write_json_term(output, triple.subject.as_ref().into()); + output.push(JsonEvent::ObjectKey("predicate".into())); + write_json_term(output, triple.predicate.as_ref().into()); + output.push(JsonEvent::ObjectKey("object".into())); + write_json_term(output, triple.object.as_ref()); + output.push(JsonEvent::EndObject); + output.push(JsonEvent::EndObject); } } - Ok(()) } pub enum JsonQueryResultsReader { diff --git a/lib/sparesults/src/serializer.rs b/lib/sparesults/src/serializer.rs index 6f5bc7dc..95356fb8 100644 --- a/lib/sparesults/src/serializer.rs +++ b/lib/sparesults/src/serializer.rs @@ -1,11 +1,20 @@ +#[cfg(feature = "async-tokio")] use crate::csv::{ - write_boolean_csv_result, write_boolean_tsv_result, CsvSolutionsWriter, TsvSolutionsWriter, + tokio_async_write_boolean_csv_result, ToTokioAsyncWriteCsvSolutionsWriter, + ToTokioAsyncWriteTsvSolutionsWriter, }; +use crate::csv::{write_boolean_csv_result, ToWriteCsvSolutionsWriter, ToWriteTsvSolutionsWriter}; use crate::format::QueryResultsFormat; -use crate::json::{write_boolean_json_result, JsonSolutionsWriter}; -use crate::xml::{write_boolean_xml_result, XmlSolutionsWriter}; +#[cfg(feature = "async-tokio")] +use crate::json::{tokio_async_write_boolean_json_result, ToTokioAsyncWriteJsonSolutionsWriter}; +use crate::json::{write_boolean_json_result, ToWriteJsonSolutionsWriter}; +#[cfg(feature = "async-tokio")] +use crate::xml::{tokio_async_write_boolean_xml_result, ToTokioAsyncWriteXmlSolutionsWriter}; +use crate::xml::{write_boolean_xml_result, ToWriteXmlSolutionsWriter}; use oxrdf::{TermRef, Variable, VariableRef}; use std::io::{self, Write}; +#[cfg(feature = "async-tokio")] +use tokio::io::AsyncWrite; /// A serializer for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats. /// @@ -15,7 +24,7 @@ use std::io::{self, Write}; /// * [SPARQL Query Results CSV Format](https://www.w3.org/TR/sparql11-results-csv-tsv/) ([`QueryResultsFormat::Csv`](QueryResultsFormat::Csv)) /// * [SPARQL Query Results TSV Format](https://www.w3.org/TR/sparql11-results-csv-tsv/) ([`QueryResultsFormat::Tsv`](QueryResultsFormat::Tsv)) /// -/// Example in JSON (the API is the same for XML and TSV): +/// Example in JSON (the API is the same for XML, CSV and TSV): /// ``` /// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// use oxrdf::{LiteralRef, Variable, VariableRef}; @@ -49,13 +58,13 @@ impl QueryResultsSerializer { /// Write a boolean query result (from an `ASK` query) into the given [`Write`] implementation. /// - /// Example in XML (the API is the same for JSON and TSV): + /// Example in XML (the API is the same for JSON, CSV and TSV): /// ``` /// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// - /// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml); + /// let xml_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml); /// let mut buffer = Vec::new(); - /// json_serializer.serialize_boolean_to_write(&mut buffer, true)?; + /// xml_serializer.serialize_boolean_to_write(&mut buffer, true)?; /// assert_eq!(buffer, b"true"); /// # std::io::Result::Ok(()) /// ``` @@ -63,8 +72,39 @@ impl QueryResultsSerializer { match self.format { QueryResultsFormat::Xml => write_boolean_xml_result(write, value), QueryResultsFormat::Json => write_boolean_json_result(write, value), - QueryResultsFormat::Csv => write_boolean_csv_result(write, value), - QueryResultsFormat::Tsv => write_boolean_tsv_result(write, value), + QueryResultsFormat::Csv | QueryResultsFormat::Tsv => { + write_boolean_csv_result(write, value) + } + } + } + + /// Write a boolean query result (from an `ASK` query) into the given [`AsyncWrite`] implementation. + /// + /// Example in JSON (the API is the same for XML, CSV and TSV): + /// ``` + /// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> std::io::Result<()> { + /// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Json); + /// let mut buffer = Vec::new(); + /// json_serializer.serialize_boolean_to_tokio_async_write(&mut buffer, false).await?; + /// assert_eq!(buffer, b"{\"head\":{},\"boolean\":false}"); + /// # Ok(()) + /// # } + /// ``` + #[cfg(feature = "async-tokio")] + pub async fn serialize_boolean_to_tokio_async_write( + &self, + write: W, + value: bool, + ) -> io::Result { + match self.format { + QueryResultsFormat::Xml => tokio_async_write_boolean_xml_result(write, value).await, + QueryResultsFormat::Json => tokio_async_write_boolean_json_result(write, value).await, + QueryResultsFormat::Csv | QueryResultsFormat::Tsv => { + tokio_async_write_boolean_csv_result(write, value).await + } } } @@ -79,15 +119,15 @@ impl QueryResultsSerializer { /// ///
This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.
/// - /// Example in XML (the API is the same for JSON and TSV): + /// Example in XML (the API is the same for JSON, CSV and TSV): /// ``` /// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// use oxrdf::{LiteralRef, Variable, VariableRef}; /// use std::iter::once; /// - /// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml); + /// let xml_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml); /// let mut buffer = Vec::new(); - /// let mut writer = json_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?; + /// let mut writer = xml_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?; /// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test"))))?; /// writer.finish()?; /// assert_eq!(buffer, b"test"); @@ -100,18 +140,65 @@ impl QueryResultsSerializer { ) -> io::Result> { Ok(ToWriteSolutionsWriter { formatter: match self.format { - QueryResultsFormat::Xml => { - ToWriteSolutionsWriterKind::Xml(XmlSolutionsWriter::start(write, &variables)?) - } - QueryResultsFormat::Json => { - ToWriteSolutionsWriterKind::Json(JsonSolutionsWriter::start(write, &variables)?) - } - QueryResultsFormat::Csv => { - ToWriteSolutionsWriterKind::Csv(CsvSolutionsWriter::start(write, variables)?) - } - QueryResultsFormat::Tsv => { - ToWriteSolutionsWriterKind::Tsv(TsvSolutionsWriter::start(write, variables)?) - } + QueryResultsFormat::Xml => ToWriteSolutionsWriterKind::Xml( + ToWriteXmlSolutionsWriter::start(write, &variables)?, + ), + QueryResultsFormat::Json => ToWriteSolutionsWriterKind::Json( + ToWriteJsonSolutionsWriter::start(write, &variables)?, + ), + QueryResultsFormat::Csv => ToWriteSolutionsWriterKind::Csv( + ToWriteCsvSolutionsWriter::start(write, variables)?, + ), + QueryResultsFormat::Tsv => ToWriteSolutionsWriterKind::Tsv( + ToWriteTsvSolutionsWriter::start(write, variables)?, + ), + }, + }) + } + + /// Returns a `SolutionsWriter` allowing writing query solutions into the given [`Write`] implementation. + /// + ///
Do not forget to run the [`finish`](ToWriteSolutionsWriter::finish()) method to properly write the last bytes of the file.
+ /// + ///
This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.
+ /// + /// Example in XML (the API is the same for JSON, CSV and TSV): + /// ``` + /// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; + /// use oxrdf::{LiteralRef, Variable, VariableRef}; + /// use std::iter::once; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> std::io::Result<()> { + /// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Json); + /// let mut buffer = Vec::new(); + /// let mut writer = json_serializer.serialize_solutions_to_tokio_async_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]).await?; + /// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test")))).await?; + /// writer.finish().await?; + /// assert_eq!(buffer, b"{\"head\":{\"vars\":[\"foo\",\"bar\"]},\"results\":{\"bindings\":[{\"foo\":{\"type\":\"literal\",\"value\":\"test\"}}]}}"); + /// # Ok(()) + /// # } + /// ``` + #[cfg(feature = "async-tokio")] + pub async fn serialize_solutions_to_tokio_async_write( + &self, + write: W, + variables: Vec, + ) -> io::Result> { + Ok(ToTokioAsyncWriteSolutionsWriter { + formatter: match self.format { + QueryResultsFormat::Xml => ToTokioAsyncWriteSolutionsWriterKind::Xml( + ToTokioAsyncWriteXmlSolutionsWriter::start(write, &variables).await?, + ), + QueryResultsFormat::Json => ToTokioAsyncWriteSolutionsWriterKind::Json( + ToTokioAsyncWriteJsonSolutionsWriter::start(write, &variables).await?, + ), + QueryResultsFormat::Csv => ToTokioAsyncWriteSolutionsWriterKind::Csv( + ToTokioAsyncWriteCsvSolutionsWriter::start(write, variables).await?, + ), + QueryResultsFormat::Tsv => ToTokioAsyncWriteSolutionsWriterKind::Tsv( + ToTokioAsyncWriteTsvSolutionsWriter::start(write, variables).await?, + ), }, }) } @@ -126,22 +213,23 @@ impl QueryResultsSerializer { } } -/// Allows writing query results. +/// Allows writing query results into a [`Write`] implementation. +/// /// Could be built using a [`QueryResultsSerializer`]. /// ///
Do not forget to run the [`finish`](ToWriteSolutionsWriter::finish()) method to properly write the last bytes of the file.
/// ///
This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.
/// -/// Example in TSV (the API is the same for JSON and XML): +/// Example in TSV (the API is the same for JSON, XML and CSV): /// ``` /// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// use oxrdf::{LiteralRef, Variable, VariableRef}; /// use std::iter::once; /// -/// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Tsv); +/// let tsv_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Tsv); /// let mut buffer = Vec::new(); -/// let mut writer = json_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?; +/// let mut writer = tsv_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?; /// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test"))))?; /// writer.finish()?; /// assert_eq!(buffer, b"?foo\t?bar\n\"test\"\t\n"); @@ -153,16 +241,16 @@ pub struct ToWriteSolutionsWriter { } enum ToWriteSolutionsWriterKind { - Xml(XmlSolutionsWriter), - Json(JsonSolutionsWriter), - Csv(CsvSolutionsWriter), - Tsv(TsvSolutionsWriter), + Xml(ToWriteXmlSolutionsWriter), + Json(ToWriteJsonSolutionsWriter), + Csv(ToWriteCsvSolutionsWriter), + Tsv(ToWriteTsvSolutionsWriter), } impl ToWriteSolutionsWriter { /// Writes a solution. /// - /// Example in JSON (the API is the same for XML and TSV): + /// Example in JSON (the API is the same for XML, CSV and TSV): /// ``` /// use sparesults::{QueryResultsFormat, QueryResultsSerializer, QuerySolution}; /// use oxrdf::{Literal, LiteralRef, Variable, VariableRef}; @@ -200,3 +288,88 @@ impl ToWriteSolutionsWriter { } } } + +/// Allows writing query results into an [`AsyncWrite`] implementation. + +/// Could be built using a [`QueryResultsSerializer`]. +/// +///
Do not forget to run the [`finish`](ToTokioAsyncWriteSolutionsWriter::finish()) method to properly write the last bytes of the file.
+/// +///
This writer does unbuffered writes. You might want to use [`BufWriter`](tokio::io::BufWriter) to avoid that.
+/// +/// Example in TSV (the API is the same for JSON, CSV and XML): +/// ``` +/// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; +/// use oxrdf::{LiteralRef, Variable, VariableRef}; +/// use std::iter::once; +/// +/// # #[tokio::main(flavor = "current_thread")] +/// # async fn main() -> std::io::Result<()> { +/// let tsv_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Tsv); +/// let mut buffer = Vec::new(); +/// let mut writer = tsv_serializer.serialize_solutions_to_tokio_async_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]).await?; +/// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test")))).await?; +/// writer.finish().await?; +/// assert_eq!(buffer, b"?foo\t?bar\n\"test\"\t\n"); +/// # Ok(()) +/// # } +/// ``` +#[cfg(feature = "async-tokio")] +#[must_use] +pub struct ToTokioAsyncWriteSolutionsWriter { + formatter: ToTokioAsyncWriteSolutionsWriterKind, +} + +#[cfg(feature = "async-tokio")] +enum ToTokioAsyncWriteSolutionsWriterKind { + Xml(ToTokioAsyncWriteXmlSolutionsWriter), + Json(ToTokioAsyncWriteJsonSolutionsWriter), + Csv(ToTokioAsyncWriteCsvSolutionsWriter), + Tsv(ToTokioAsyncWriteTsvSolutionsWriter), +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteSolutionsWriter { + /// Writes a solution. + /// + /// Example in JSON (the API is the same for XML, CSV and TSV): + /// ``` + /// use sparesults::{QueryResultsFormat, QueryResultsSerializer, QuerySolution}; + /// use oxrdf::{Literal, LiteralRef, Variable, VariableRef}; + /// use std::iter::once; + /// + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn main() -> std::io::Result<()> { + /// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Json); + /// let mut buffer = Vec::new(); + /// let mut writer = json_serializer.serialize_solutions_to_tokio_async_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]).await?; + /// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test")))).await?; + /// writer.write(&QuerySolution::from((vec![Variable::new_unchecked("bar")], vec![Some(Literal::from("test").into())]))).await?; + /// writer.finish().await?; + /// assert_eq!(buffer, b"{\"head\":{\"vars\":[\"foo\",\"bar\"]},\"results\":{\"bindings\":[{\"foo\":{\"type\":\"literal\",\"value\":\"test\"}},{\"bar\":{\"type\":\"literal\",\"value\":\"test\"}}]}}"); + /// # Ok(()) + /// # } + /// ``` + pub async fn write<'a>( + &mut self, + solution: impl IntoIterator>, impl Into>)>, + ) -> io::Result<()> { + let solution = solution.into_iter().map(|(v, s)| (v.into(), s.into())); + match &mut self.formatter { + ToTokioAsyncWriteSolutionsWriterKind::Xml(writer) => writer.write(solution).await, + ToTokioAsyncWriteSolutionsWriterKind::Json(writer) => writer.write(solution).await, + ToTokioAsyncWriteSolutionsWriterKind::Csv(writer) => writer.write(solution).await, + ToTokioAsyncWriteSolutionsWriterKind::Tsv(writer) => writer.write(solution).await, + } + } + + /// Writes the last bytes of the file. + pub async fn finish(self) -> io::Result { + match self.formatter { + ToTokioAsyncWriteSolutionsWriterKind::Xml(write) => write.finish().await, + ToTokioAsyncWriteSolutionsWriterKind::Json(write) => write.finish().await, + ToTokioAsyncWriteSolutionsWriterKind::Csv(write) => Ok(write.finish()), + ToTokioAsyncWriteSolutionsWriterKind::Tsv(write) => Ok(write.finish()), + } + } +} diff --git a/lib/sparesults/src/xml.rs b/lib/sparesults/src/xml.rs index 7df4fa9e..808c0f8c 100644 --- a/lib/sparesults/src/xml.rs +++ b/lib/sparesults/src/xml.rs @@ -11,147 +11,212 @@ use std::collections::BTreeMap; use std::io::{self, BufReader, Read, Write}; use std::str; use std::sync::Arc; +#[cfg(feature = "async-tokio")] +use tokio::io::AsyncWrite; -pub fn write_boolean_xml_result(sink: W, value: bool) -> io::Result { - do_write_boolean_xml_result(sink, value).map_err(map_xml_error) +pub fn write_boolean_xml_result(write: W, value: bool) -> io::Result { + let mut writer = Writer::new(write); + for event in inner_write_boolean_xml_result(value) { + writer.write_event(event).map_err(map_xml_error)?; + } + Ok(writer.into_inner()) } -fn do_write_boolean_xml_result(sink: W, value: bool) -> Result { - let mut writer = Writer::new(sink); - writer.write_event(Event::Decl(BytesDecl::new("1.0", None, None)))?; - writer - .create_element("sparql") - .with_attribute(("xmlns", "http://www.w3.org/2005/sparql-results#")) - .write_inner_content(|writer| { - writer - .create_element("head") - .write_text_content(BytesText::new(""))? - .create_element("boolean") - .write_text_content(BytesText::new(if value { "true" } else { "false" }))?; - quick_xml::Result::Ok(()) - })?; +#[cfg(feature = "async-tokio")] +pub async fn tokio_async_write_boolean_xml_result( + write: W, + value: bool, +) -> io::Result { + let mut writer = Writer::new(write); + for event in inner_write_boolean_xml_result(value) { + writer + .write_event_async(event) + .await + .map_err(map_xml_error)?; + } Ok(writer.into_inner()) } -pub struct XmlSolutionsWriter { - writer: Writer, +fn inner_write_boolean_xml_result(value: bool) -> [Event<'static>; 8] { + [ + Event::Decl(BytesDecl::new("1.0", None, None)), + Event::Start( + BytesStart::new("sparql") + .with_attributes([("xmlns", "http://www.w3.org/2005/sparql-results#")]), + ), + Event::Start(BytesStart::new("head")), + Event::End(BytesEnd::new("head")), + Event::Start(BytesStart::new("boolean")), + Event::Text(BytesText::new(if value { "true" } else { "false" })), + Event::End(BytesEnd::new("boolean")), + Event::End(BytesEnd::new("sparql")), + ] } -impl XmlSolutionsWriter { - pub fn start(sink: W, variables: &[Variable]) -> io::Result { - Self::do_start(sink, variables).map_err(map_xml_error) - } +pub struct ToWriteXmlSolutionsWriter { + inner: InnerXmlSolutionsWriter, + writer: Writer, +} - fn do_start(sink: W, variables: &[Variable]) -> Result { - let mut writer = Writer::new(sink); - writer.write_event(Event::Decl(BytesDecl::new("1.0", None, None)))?; - let mut sparql_open = BytesStart::new("sparql"); - sparql_open.push_attribute(("xmlns", "http://www.w3.org/2005/sparql-results#")); - writer.write_event(Event::Start(sparql_open))?; - writer - .create_element("head") - .write_inner_content(|writer| { - for variable in variables { - writer - .create_element("variable") - .with_attribute(("name", variable.as_str())) - .write_empty()?; - } - quick_xml::Result::Ok(()) - })?; - writer.write_event(Event::Start(BytesStart::new("results")))?; - Ok(Self { writer }) +impl ToWriteXmlSolutionsWriter { + pub fn start(write: W, variables: &[Variable]) -> io::Result { + let mut writer = Writer::new(write); + let mut buffer = Vec::with_capacity(48); + let inner = InnerXmlSolutionsWriter::start(&mut buffer, variables); + Self::do_write(&mut writer, buffer)?; + Ok(Self { inner, writer }) } pub fn write<'a>( &mut self, solution: impl IntoIterator, TermRef<'a>)>, ) -> io::Result<()> { - self.do_write(solution).map_err(map_xml_error) + let mut buffer = Vec::with_capacity(48); + self.inner.write(&mut buffer, solution); + Self::do_write(&mut self.writer, buffer) + } + + pub fn finish(mut self) -> io::Result { + let mut buffer = Vec::with_capacity(4); + self.inner.finish(&mut buffer); + Self::do_write(&mut self.writer, buffer)?; + Ok(self.writer.into_inner()) + } + + fn do_write(writer: &mut Writer, output: Vec>) -> io::Result<()> { + for event in output { + writer.write_event(event).map_err(map_xml_error)?; + } + Ok(()) + } +} + +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteXmlSolutionsWriter { + inner: InnerXmlSolutionsWriter, + writer: Writer, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteXmlSolutionsWriter { + pub async fn start(write: W, variables: &[Variable]) -> io::Result { + let mut writer = Writer::new(write); + let mut buffer = Vec::with_capacity(48); + let inner = InnerXmlSolutionsWriter::start(&mut buffer, variables); + Self::do_write(&mut writer, buffer).await?; + Ok(Self { writer, inner }) } - fn do_write<'a>( + pub async fn write<'a>( &mut self, solution: impl IntoIterator, TermRef<'a>)>, - ) -> Result<(), quick_xml::Error> { - self.writer - .create_element("result") - .write_inner_content(|writer| { - for (variable, value) in solution { - writer - .create_element("binding") - .with_attribute(("name", variable.as_str())) - .write_inner_content(|writer| write_xml_term(value, writer))?; - } - quick_xml::Result::Ok(()) - })?; + ) -> io::Result<()> { + let mut buffer = Vec::with_capacity(48); + self.inner.write(&mut buffer, solution); + Self::do_write(&mut self.writer, buffer).await + } + + pub async fn finish(mut self) -> io::Result { + let mut buffer = Vec::with_capacity(4); + self.inner.finish(&mut buffer); + Self::do_write(&mut self.writer, buffer).await?; + Ok(self.writer.into_inner()) + } + + async fn do_write(writer: &mut Writer, output: Vec>) -> io::Result<()> { + for event in output { + writer + .write_event_async(event) + .await + .map_err(map_xml_error)?; + } Ok(()) } +} + +struct InnerXmlSolutionsWriter; - pub fn finish(self) -> io::Result { - self.do_finish().map_err(map_xml_error) +impl InnerXmlSolutionsWriter { + fn start<'a>(output: &mut Vec>, variables: &'a [Variable]) -> Self { + output.push(Event::Decl(BytesDecl::new("1.0", None, None))); + output.push(Event::Start(BytesStart::new("sparql").with_attributes([( + "xmlns", + "http://www.w3.org/2005/sparql-results#", + )]))); + output.push(Event::Start(BytesStart::new("head"))); + for variable in variables { + output.push(Event::Empty( + BytesStart::new("variable").with_attributes([("name", variable.as_str())]), + )); + } + output.push(Event::End(BytesEnd::new("head"))); + output.push(Event::Start(BytesStart::new("results"))); + Self {} } - fn do_finish(mut self) -> Result { - self.writer - .write_event(Event::End(BytesEnd::new("results")))?; - self.writer - .write_event(Event::End(BytesEnd::new("sparql")))?; - Ok(self.writer.into_inner()) + #[allow(clippy::unused_self)] + + fn write<'a>( + &self, + output: &mut Vec>, + solution: impl IntoIterator, TermRef<'a>)>, + ) { + output.push(Event::Start(BytesStart::new("result"))); + for (variable, value) in solution { + output.push(Event::Start( + BytesStart::new("binding").with_attributes([("name", variable.as_str())]), + )); + write_xml_term(output, value); + output.push(Event::End(BytesEnd::new("binding"))); + } + output.push(Event::End(BytesEnd::new("result"))); + } + + #[allow(clippy::unused_self)] + fn finish(self, output: &mut Vec>) { + output.push(Event::End(BytesEnd::new("results"))); + output.push(Event::End(BytesEnd::new("sparql"))); } } -fn write_xml_term( - term: TermRef<'_>, - writer: &mut Writer, -) -> Result<(), quick_xml::Error> { +fn write_xml_term<'a>(output: &mut Vec>, term: TermRef<'a>) { match term { TermRef::NamedNode(uri) => { - writer - .create_element("uri") - .write_text_content(BytesText::new(uri.as_str()))?; + output.push(Event::Start(BytesStart::new("uri"))); + output.push(Event::Text(BytesText::new(uri.as_str()))); + output.push(Event::End(BytesEnd::new("uri"))); } TermRef::BlankNode(bnode) => { - writer - .create_element("bnode") - .write_text_content(BytesText::new(bnode.as_str()))?; + output.push(Event::Start(BytesStart::new("bnode"))); + output.push(Event::Text(BytesText::new(bnode.as_str()))); + output.push(Event::End(BytesEnd::new("bnode"))); } TermRef::Literal(literal) => { - let element = writer.create_element("literal"); - let element = if let Some(language) = literal.language() { - element.with_attribute(("xml:lang", language)) + let mut start = BytesStart::new("literal"); + if let Some(language) = literal.language() { + start.push_attribute(("xml:lang", language)); } else if !literal.is_plain() { - element.with_attribute(("datatype", literal.datatype().as_str())) - } else { - element - }; - element.write_text_content(BytesText::new(literal.value()))?; + start.push_attribute(("datatype", literal.datatype().as_str())) + } + output.push(Event::Start(start)); + output.push(Event::Text(BytesText::new(literal.value()))); + output.push(Event::End(BytesEnd::new("literal"))); } #[cfg(feature = "rdf-star")] TermRef::Triple(triple) => { - writer - .create_element("triple") - .write_inner_content(|writer| { - writer - .create_element("subject") - .write_inner_content(|writer| { - write_xml_term(triple.subject.as_ref().into(), writer) - })?; - writer - .create_element("predicate") - .write_inner_content(|writer| { - write_xml_term(triple.predicate.as_ref().into(), writer) - })?; - writer - .create_element("object") - .write_inner_content(|writer| { - write_xml_term(triple.object.as_ref(), writer) - })?; - quick_xml::Result::Ok(()) - })?; + output.push(Event::Start(BytesStart::new("triple"))); + output.push(Event::Start(BytesStart::new("subject"))); + write_xml_term(output, triple.subject.as_ref().into()); + output.push(Event::End(BytesEnd::new("subject"))); + output.push(Event::Start(BytesStart::new("predicate"))); + write_xml_term(output, triple.predicate.as_ref().into()); + output.push(Event::End(BytesEnd::new("predicate"))); + output.push(Event::Start(BytesStart::new("object"))); + write_xml_term(output, triple.object.as_ref()); + output.push(Event::End(BytesEnd::new("object"))); + output.push(Event::End(BytesEnd::new("triple"))); } } - Ok(()) } pub enum XmlQueryResultsReader {