Adds Tokio support to SPARQL results serializers

pull/680/head
Tpt 1 year ago committed by Thomas Tanon
parent d1cb4cecbd
commit 756c5394d0
  1. 4
      Cargo.lock
  2. 5
      lib/sparesults/Cargo.toml
  3. 401
      lib/sparesults/src/csv.rs
  4. 246
      lib/sparesults/src/json.rs
  5. 237
      lib/sparesults/src/serializer.rs
  6. 261
      lib/sparesults/src/xml.rs

4
Cargo.lock generated

@ -780,6 +780,9 @@ name = "json-event-parser"
version = "0.2.0-alpha.2" version = "0.2.0-alpha.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b5ddd02379e99769e117ab30d21ad42dcec8ad3c12be77f9a34779e62d46346" checksum = "5b5ddd02379e99769e117ab30d21ad42dcec8ad3c12be77f9a34779e62d46346"
dependencies = [
"tokio",
]
[[package]] [[package]]
name = "kernel32-sys" name = "kernel32-sys"
@ -1731,6 +1734,7 @@ dependencies = [
"memchr", "memchr",
"oxrdf", "oxrdf",
"quick-xml", "quick-xml",
"tokio",
] ]
[[package]] [[package]]

@ -17,12 +17,17 @@ rust-version.workspace = true
[features] [features]
default = [] default = []
rdf-star = ["oxrdf/rdf-star"] rdf-star = ["oxrdf/rdf-star"]
async-tokio = ["dep:tokio", "quick-xml/async-tokio", "json-event-parser/async-tokio"]
[dependencies] [dependencies]
json-event-parser = "0.2.0-alpha.2" json-event-parser = "0.2.0-alpha.2"
memchr = "2.5" memchr = "2.5"
oxrdf = { version = "0.2.0-alpha.1-dev", path="../oxrdf" } oxrdf = { version = "0.2.0-alpha.1-dev", path="../oxrdf" }
quick-xml = ">=0.29, <0.32" quick-xml = ">=0.29, <0.32"
tokio = { version = "1.29", optional = true, features = ["io-util"] }
[dev-dependencies]
tokio = { version = "1.29", features = ["rt", "macros"] }
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true

@ -6,38 +6,121 @@ use oxrdf::Variable;
use oxrdf::{vocab::xsd, *}; use oxrdf::{vocab::xsd, *};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::str::{self, FromStr}; use std::str::{self, FromStr};
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncWrite, AsyncWriteExt};
const MAX_BUFFER_SIZE: usize = 4096 * 4096; const MAX_BUFFER_SIZE: usize = 4096 * 4096;
pub fn write_boolean_csv_result<W: Write>(mut sink: W, value: bool) -> io::Result<W> { pub fn write_boolean_csv_result<W: Write>(mut write: W, value: bool) -> io::Result<W> {
sink.write_all(if value { b"true" } else { b"false" })?; write.write_all(if value { b"true" } else { b"false" })?;
Ok(sink) Ok(write)
}
#[cfg(feature = "async-tokio")]
pub async fn tokio_async_write_boolean_csv_result<W: AsyncWrite + Unpin>(
mut write: W,
value: bool,
) -> io::Result<W> {
write
.write_all(if value { b"true" } else { b"false" })
.await?;
Ok(write)
}
pub struct ToWriteCsvSolutionsWriter<W: Write> {
inner: InnerCsvSolutionsWriter,
write: W,
buffer: String,
}
impl<W: Write> ToWriteCsvSolutionsWriter<W> {
pub fn start(mut write: W, variables: Vec<Variable>) -> io::Result<Self> {
let mut buffer = String::new();
let inner = InnerCsvSolutionsWriter::start(&mut buffer, variables);
write.write_all(buffer.as_bytes())?;
buffer.clear();
Ok(Self {
inner,
write,
buffer,
})
} }
pub struct CsvSolutionsWriter<W: Write> { pub fn write<'a>(
sink: W, &mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> {
self.inner.write(&mut self.buffer, solution);
self.write.write_all(self.buffer.as_bytes())?;
self.buffer.clear();
Ok(())
}
pub fn finish(self) -> W {
self.write
}
}
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteCsvSolutionsWriter<W: AsyncWrite + Unpin> {
inner: InnerCsvSolutionsWriter,
write: W,
buffer: String,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteCsvSolutionsWriter<W> {
pub async fn start(mut write: W, variables: Vec<Variable>) -> io::Result<Self> {
let mut buffer = String::new();
let inner = InnerCsvSolutionsWriter::start(&mut buffer, variables);
write.write_all(buffer.as_bytes()).await?;
buffer.clear();
Ok(Self {
inner,
write,
buffer,
})
}
pub async fn write<'a>(
&mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> {
self.inner.write(&mut self.buffer, solution);
self.write.write_all(self.buffer.as_bytes()).await?;
self.buffer.clear();
Ok(())
}
pub fn finish(self) -> W {
self.write
}
}
struct InnerCsvSolutionsWriter {
variables: Vec<Variable>, variables: Vec<Variable>,
} }
impl<W: Write> CsvSolutionsWriter<W> { impl InnerCsvSolutionsWriter {
pub fn start(mut sink: W, variables: Vec<Variable>) -> io::Result<Self> { fn start(output: &mut String, variables: Vec<Variable>) -> Self {
let mut start_vars = true; let mut start_vars = true;
for variable in &variables { for variable in &variables {
if start_vars { if start_vars {
start_vars = false; start_vars = false;
} else { } else {
sink.write_all(b",")?; output.push(',');
} }
sink.write_all(variable.as_str().as_bytes())?; output.push_str(variable.as_str());
} }
sink.write_all(b"\r\n")?; output.push_str("\r\n");
Ok(Self { sink, variables }) Self { variables }
} }
pub fn write<'a>( fn write<'a>(
&mut self, &self,
output: &mut String,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>, solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> { ) {
let mut values = vec![None; self.variables.len()]; let mut values = vec![None; self.variables.len()];
for (variable, value) in solution { for (variable, value) in solution {
if let Some(position) = self.variables.iter().position(|v| *v == variable) { if let Some(position) = self.variables.iter().position(|v| *v == variable) {
@ -49,85 +132,147 @@ impl<W: Write> CsvSolutionsWriter<W> {
if start_binding { if start_binding {
start_binding = false; start_binding = false;
} else { } else {
self.sink.write_all(b",")?; output.push(',');
} }
if let Some(value) = value { if let Some(value) = value {
write_csv_term(value, &mut self.sink)?; write_csv_term(output, value);
} }
} }
self.sink.write_all(b"\r\n") output.push_str("\r\n");
}
pub fn finish(self) -> W {
self.sink
} }
} }
fn write_csv_term<'a>(term: impl Into<TermRef<'a>>, sink: &mut impl Write) -> io::Result<()> { fn write_csv_term<'a>(output: &mut String, term: impl Into<TermRef<'a>>) {
match term.into() { match term.into() {
TermRef::NamedNode(uri) => sink.write_all(uri.as_str().as_bytes()), TermRef::NamedNode(uri) => output.push_str(uri.as_str()),
TermRef::BlankNode(bnode) => { TermRef::BlankNode(bnode) => {
sink.write_all(b"_:")?; output.push_str("_:");
sink.write_all(bnode.as_str().as_bytes()) output.push_str(bnode.as_str())
} }
TermRef::Literal(literal) => write_escaped_csv_string(literal.value(), sink), TermRef::Literal(literal) => write_escaped_csv_string(output, literal.value()),
#[cfg(feature = "rdf-star")] #[cfg(feature = "rdf-star")]
TermRef::Triple(triple) => { TermRef::Triple(triple) => {
write_csv_term(&triple.subject, sink)?; write_csv_term(output, &triple.subject);
sink.write_all(b" ")?; output.push(' ');
write_csv_term(&triple.predicate, sink)?; write_csv_term(output, &triple.predicate);
sink.write_all(b" ")?; output.push(' ');
write_csv_term(&triple.object, sink) write_csv_term(output, &triple.object)
} }
} }
} }
fn write_escaped_csv_string(s: &str, sink: &mut impl Write) -> io::Result<()> { fn write_escaped_csv_string(output: &mut String, s: &str) {
if s.bytes().any(|c| matches!(c, b'"' | b',' | b'\n' | b'\r')) { if s.bytes().any(|c| matches!(c, b'"' | b',' | b'\n' | b'\r')) {
sink.write_all(b"\"")?; output.push('"');
for c in s.bytes() { for c in s.chars() {
if c == b'\"' { if c == '"' {
sink.write_all(b"\"\"") output.push('"');
output.push('"');
} else { } else {
sink.write_all(&[c]) output.push(c)
}?; };
} }
sink.write_all(b"\"") output.push('"');
} else { } else {
sink.write_all(s.as_bytes()) output.push_str(s)
}
}
pub struct ToWriteTsvSolutionsWriter<W: Write> {
inner: InnerTsvSolutionsWriter,
write: W,
buffer: String,
}
impl<W: Write> ToWriteTsvSolutionsWriter<W> {
pub fn start(mut write: W, variables: Vec<Variable>) -> io::Result<Self> {
let mut buffer = String::new();
let inner = InnerTsvSolutionsWriter::start(&mut buffer, variables);
write.write_all(buffer.as_bytes())?;
buffer.clear();
Ok(Self {
inner,
write,
buffer,
})
} }
pub fn write<'a>(
&mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> {
self.inner.write(&mut self.buffer, solution);
self.write.write_all(self.buffer.as_bytes())?;
self.buffer.clear();
Ok(())
} }
pub fn write_boolean_tsv_result<W: Write>(mut sink: W, value: bool) -> io::Result<W> { pub fn finish(self) -> W {
sink.write_all(if value { b"true" } else { b"false" })?; self.write
Ok(sink) }
} }
pub struct TsvSolutionsWriter<W: Write> { #[cfg(feature = "async-tokio")]
sink: W, pub struct ToTokioAsyncWriteTsvSolutionsWriter<W: AsyncWrite + Unpin> {
inner: InnerTsvSolutionsWriter,
write: W,
buffer: String,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteTsvSolutionsWriter<W> {
pub async fn start(mut write: W, variables: Vec<Variable>) -> io::Result<Self> {
let mut buffer = String::new();
let inner = InnerTsvSolutionsWriter::start(&mut buffer, variables);
write.write_all(buffer.as_bytes()).await?;
buffer.clear();
Ok(Self {
inner,
write,
buffer,
})
}
pub async fn write<'a>(
&mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> {
self.inner.write(&mut self.buffer, solution);
self.write.write_all(self.buffer.as_bytes()).await?;
self.buffer.clear();
Ok(())
}
pub fn finish(self) -> W {
self.write
}
}
struct InnerTsvSolutionsWriter {
variables: Vec<Variable>, variables: Vec<Variable>,
} }
impl<W: Write> TsvSolutionsWriter<W> { impl InnerTsvSolutionsWriter {
pub fn start(mut sink: W, variables: Vec<Variable>) -> io::Result<Self> { fn start(output: &mut String, variables: Vec<Variable>) -> Self {
let mut start_vars = true; let mut start_vars = true;
for variable in &variables { for variable in &variables {
if start_vars { if start_vars {
start_vars = false; start_vars = false;
} else { } else {
sink.write_all(b"\t")?; output.push('\t');
} }
sink.write_all(b"?")?; output.push('?');
sink.write_all(variable.as_str().as_bytes())?; output.push_str(variable.as_str());
} }
sink.write_all(b"\n")?; output.push('\n');
Ok(Self { sink, variables }) Self { variables }
} }
pub fn write<'a>( fn write<'a>(
&mut self, &self,
output: &mut String,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>, solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> { ) {
let mut values = vec![None; self.variables.len()]; let mut values = vec![None; self.variables.len()];
for (variable, value) in solution { for (variable, value) in solution {
if let Some(position) = self.variables.iter().position(|v| *v == variable) { if let Some(position) = self.variables.iter().position(|v| *v == variable) {
@ -139,70 +284,74 @@ impl<W: Write> TsvSolutionsWriter<W> {
if start_binding { if start_binding {
start_binding = false; start_binding = false;
} else { } else {
self.sink.write_all(b"\t")?; output.push('\t');
} }
if let Some(value) = value { if let Some(value) = value {
write_tsv_term(value, &mut self.sink)?; write_tsv_term(output, value);
} }
} }
self.sink.write_all(b"\n") output.push('\n');
}
pub fn finish(self) -> W {
self.sink
} }
} }
fn write_tsv_term<'a>(term: impl Into<TermRef<'a>>, sink: &mut impl Write) -> io::Result<()> { fn write_tsv_term<'a>(output: &mut String, term: impl Into<TermRef<'a>>) {
match term.into() { match term.into() {
TermRef::NamedNode(node) => write!(sink, "<{}>", node.as_str()), TermRef::NamedNode(node) => {
TermRef::BlankNode(node) => write!(sink, "_:{}", node.as_str()), output.push('<');
output.push_str(node.as_str());
output.push('>');
}
TermRef::BlankNode(node) => {
output.push_str("_:");
output.push_str(node.as_str());
}
TermRef::Literal(literal) => { TermRef::Literal(literal) => {
let value = literal.value(); let value = literal.value();
if let Some(language) = literal.language() { if let Some(language) = literal.language() {
write_tsv_quoted_str(value, sink)?; write_tsv_quoted_str(output, value);
write!(sink, "@{language}") output.push('@');
output.push_str(language);
} else { } else {
match literal.datatype() { match literal.datatype() {
xsd::BOOLEAN if is_turtle_boolean(value) => sink.write_all(value.as_bytes()), xsd::BOOLEAN if is_turtle_boolean(value) => output.push_str(value),
xsd::INTEGER if is_turtle_integer(value) => sink.write_all(value.as_bytes()), xsd::INTEGER if is_turtle_integer(value) => output.push_str(value),
xsd::DECIMAL if is_turtle_decimal(value) => sink.write_all(value.as_bytes()), xsd::DECIMAL if is_turtle_decimal(value) => output.push_str(value),
xsd::DOUBLE if is_turtle_double(value) => sink.write_all(value.as_bytes()), xsd::DOUBLE if is_turtle_double(value) => output.push_str(value),
xsd::STRING => write_tsv_quoted_str(value, sink), xsd::STRING => write_tsv_quoted_str(output, value),
datatype => { datatype => {
write_tsv_quoted_str(value, sink)?; write_tsv_quoted_str(output, value);
write!(sink, "^^<{}>", datatype.as_str()) output.push_str("^^");
write_tsv_term(output, datatype);
} }
} }
} }
} }
#[cfg(feature = "rdf-star")] #[cfg(feature = "rdf-star")]
TermRef::Triple(triple) => { TermRef::Triple(triple) => {
sink.write_all(b"<< ")?; output.push_str("<< ");
write_tsv_term(&triple.subject, sink)?; write_tsv_term(output, &triple.subject);
sink.write_all(b" ")?; output.push(' ');
write_tsv_term(&triple.predicate, sink)?; write_tsv_term(output, &triple.predicate);
sink.write_all(b" ")?; output.push(' ');
write_tsv_term(&triple.object, sink)?; write_tsv_term(output, &triple.object);
sink.write_all(b" >>")?; output.push_str(" >>");
Ok(())
} }
} }
} }
fn write_tsv_quoted_str(string: &str, f: &mut impl Write) -> io::Result<()> { fn write_tsv_quoted_str(output: &mut String, string: &str) {
f.write_all(b"\"")?; output.push('"');
for c in string.bytes() { for c in string.chars() {
match c { match c {
b'\t' => f.write_all(b"\\t"), '\t' => output.push_str("\\t"),
b'\n' => f.write_all(b"\\n"), '\n' => output.push_str("\\n"),
b'\r' => f.write_all(b"\\r"), '\r' => output.push_str("\\r"),
b'"' => f.write_all(b"\\\""), '"' => output.push_str("\\\""),
b'\\' => f.write_all(b"\\\\"), '\\' => output.push_str("\\\\"),
_ => f.write_all(&[c]), _ => output.push(c),
}?; };
} }
f.write_all(b"\"") output.push('"');
} }
fn is_turtle_boolean(value: &str) -> bool { fn is_turtle_boolean(value: &str) -> bool {
@ -439,7 +588,7 @@ impl LineReader {
if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) { if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) {
break self.buffer_start + eol + 1; break self.buffer_start + eol + 1;
} }
if self.buffer_start > buffer.len() / 2 { if self.buffer_start > 0 {
buffer.copy_within(self.buffer_start..self.buffer_end, 0); buffer.copy_within(self.buffer_start..self.buffer_end, 0);
self.buffer_end -= self.buffer_start; self.buffer_end -= self.buffer_start;
self.buffer_start = 0; self.buffer_start = 0;
@ -480,7 +629,6 @@ mod tests {
use super::*; use super::*;
use std::error::Error; use std::error::Error;
use std::rc::Rc; use std::rc::Rc;
use std::str;
fn build_example() -> (Vec<Variable>, Vec<Vec<Option<Term>>>) { fn build_example() -> (Vec<Variable>, Vec<Vec<Option<Term>>>) {
( (
@ -530,21 +678,21 @@ mod tests {
} }
#[test] #[test]
fn test_csv_serialization() -> io::Result<()> { fn test_csv_serialization() {
let (variables, solutions) = build_example(); let (variables, solutions) = build_example();
let mut writer = CsvSolutionsWriter::start(Vec::new(), variables.clone())?; let mut buffer = String::new();
let writer = InnerCsvSolutionsWriter::start(&mut buffer, variables.clone());
let variables = Rc::new(variables); let variables = Rc::new(variables);
for solution in solutions { for solution in solutions {
writer.write( writer.write(
&mut buffer,
variables variables
.iter() .iter()
.zip(&solution) .zip(&solution)
.filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))), .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))),
)?; );
} }
let result = writer.finish(); assert_eq!(buffer, "x,literal\r\nhttp://example/x,String\r\nhttp://example/x,\"String-with-dquote\"\"\"\r\n_:b0,Blank node\r\n,Missing 'x'\r\n,\r\nhttp://example/x,\r\n_:b1,String-with-lang\r\n_:b1,123\r\n,\"escape,\t\r\n\"\r\n");
assert_eq!(str::from_utf8(&result).unwrap(), "x,literal\r\nhttp://example/x,String\r\nhttp://example/x,\"String-with-dquote\"\"\"\r\n_:b0,Blank node\r\n,Missing 'x'\r\n,\r\nhttp://example/x,\r\n_:b1,String-with-lang\r\n_:b1,123\r\n,\"escape,\t\r\n\"\r\n");
Ok(())
} }
#[test] #[test]
@ -552,24 +700,25 @@ mod tests {
let (variables, solutions) = build_example(); let (variables, solutions) = build_example();
// Write // Write
let mut writer = TsvSolutionsWriter::start(Vec::new(), variables.clone())?; let mut buffer = String::new();
let writer = InnerTsvSolutionsWriter::start(&mut buffer, variables.clone());
let variables = Rc::new(variables); let variables = Rc::new(variables);
for solution in &solutions { for solution in &solutions {
writer.write( writer.write(
&mut buffer,
variables variables
.iter() .iter()
.zip(solution) .zip(solution)
.filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))), .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))),
)?; );
} }
let result = writer.finish(); assert_eq!(buffer, "?x\t?literal\n<http://example/x>\t\"String\"\n<http://example/x>\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n<http://example/x>\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n");
assert_eq!(str::from_utf8(&result).unwrap(), "?x\t?literal\n<http://example/x>\t\"String\"\n<http://example/x>\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n<http://example/x>\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n");
// Read // Read
if let TsvQueryResultsReader::Solutions { if let TsvQueryResultsReader::Solutions {
solutions: mut solutions_iter, solutions: mut solutions_iter,
variables: actual_variables, variables: actual_variables,
} = TsvQueryResultsReader::read(result.as_slice())? } = TsvQueryResultsReader::read(buffer.as_bytes())?
{ {
assert_eq!(actual_variables.as_slice(), variables.as_slice()); assert_eq!(actual_variables.as_slice(), variables.as_slice());
let mut rows = Vec::new(); let mut rows = Vec::new();
@ -610,21 +759,19 @@ mod tests {
} }
#[test] #[test]
fn test_no_columns_csv_serialization() -> io::Result<()> { fn test_no_columns_csv_serialization() {
let mut writer = CsvSolutionsWriter::start(Vec::new(), Vec::new())?; let mut buffer = String::new();
writer.write([])?; let writer = InnerCsvSolutionsWriter::start(&mut buffer, Vec::new());
let result = writer.finish(); writer.write(&mut buffer, []);
assert_eq!(str::from_utf8(&result).unwrap(), "\r\n\r\n"); assert_eq!(buffer, "\r\n\r\n");
Ok(())
} }
#[test] #[test]
fn test_no_columns_tsv_serialization() -> io::Result<()> { fn test_no_columns_tsv_serialization() {
let mut writer = TsvSolutionsWriter::start(Vec::new(), Vec::new())?; let mut buffer = String::new();
writer.write([])?; let writer = InnerTsvSolutionsWriter::start(&mut buffer, Vec::new());
let result = writer.finish(); writer.write(&mut buffer, []);
assert_eq!(str::from_utf8(&result).unwrap(), "\n\n"); assert_eq!(buffer, "\n\n");
Ok(())
} }
#[test] #[test]
@ -644,19 +791,17 @@ mod tests {
} }
#[test] #[test]
fn test_no_results_csv_serialization() -> io::Result<()> { fn test_no_results_csv_serialization() {
let result = let mut buffer = String::new();
CsvSolutionsWriter::start(Vec::new(), vec![Variable::new_unchecked("a")])?.finish(); InnerCsvSolutionsWriter::start(&mut buffer, vec![Variable::new_unchecked("a")]);
assert_eq!(str::from_utf8(&result).unwrap(), "a\r\n"); assert_eq!(buffer, "a\r\n");
Ok(())
} }
#[test] #[test]
fn test_no_results_tsv_serialization() -> io::Result<()> { fn test_no_results_tsv_serialization() {
let result = let mut buffer = String::new();
TsvSolutionsWriter::start(Vec::new(), vec![Variable::new_unchecked("a")])?.finish(); InnerTsvSolutionsWriter::start(&mut buffer, vec![Variable::new_unchecked("a")]);
assert_eq!(str::from_utf8(&result).unwrap(), "?a\n"); assert_eq!(buffer, "?a\n");
Ok(())
} }
#[test] #[test]

@ -1,6 +1,8 @@
//! Implementation of [SPARQL Query Results JSON Format](https://www.w3.org/TR/sparql11-results-json/) //! Implementation of [SPARQL Query Results JSON Format](https://www.w3.org/TR/sparql11-results-json/)
use crate::error::{ParseError, SyntaxError}; use crate::error::{ParseError, SyntaxError};
#[cfg(feature = "async-tokio")]
use json_event_parser::ToTokioAsyncWriteJsonWriter;
use json_event_parser::{FromReadJsonReader, JsonEvent, ToWriteJsonWriter}; use json_event_parser::{FromReadJsonReader, JsonEvent, ToWriteJsonWriter};
use oxrdf::vocab::rdf; use oxrdf::vocab::rdf;
use oxrdf::Variable; use oxrdf::Variable;
@ -8,6 +10,8 @@ use oxrdf::*;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::mem::take; use std::mem::take;
#[cfg(feature = "async-tokio")]
use tokio::io::AsyncWrite;
/// This limit is set in order to avoid stack overflow error when parsing nested triples due to too many recursive calls. /// This limit is set in order to avoid stack overflow error when parsing nested triples due to too many recursive calls.
/// The actual limit value is a wet finger compromise between not failing to parse valid files and avoiding to trigger stack overflow errors. /// The actual limit value is a wet finger compromise between not failing to parse valid files and avoiding to trigger stack overflow errors.
@ -15,116 +19,210 @@ const MAX_NUMBER_OF_NESTED_TRIPLES: usize = 128;
pub fn write_boolean_json_result<W: Write>(write: W, value: bool) -> io::Result<W> { pub fn write_boolean_json_result<W: Write>(write: W, value: bool) -> io::Result<W> {
let mut writer = ToWriteJsonWriter::new(write); let mut writer = ToWriteJsonWriter::new(write);
writer.write_event(JsonEvent::StartObject)?; for event in inner_write_boolean_json_result(value) {
writer.write_event(JsonEvent::ObjectKey("head".into()))?; writer.write_event(event)?;
writer.write_event(JsonEvent::StartObject)?; }
writer.write_event(JsonEvent::EndObject)?; writer.finish()
writer.write_event(JsonEvent::ObjectKey("boolean".into()))?; }
writer.write_event(JsonEvent::Boolean(value))?;
writer.write_event(JsonEvent::EndObject)?; #[cfg(feature = "async-tokio")]
pub async fn tokio_async_write_boolean_json_result<W: AsyncWrite + Unpin>(
write: W,
value: bool,
) -> io::Result<W> {
let mut writer = ToTokioAsyncWriteJsonWriter::new(write);
for event in inner_write_boolean_json_result(value) {
writer.write_event(event).await?;
}
writer.finish() writer.finish()
} }
pub struct JsonSolutionsWriter<W: Write> { fn inner_write_boolean_json_result(value: bool) -> [JsonEvent<'static>; 7] {
[
JsonEvent::StartObject,
JsonEvent::ObjectKey("head".into()),
JsonEvent::StartObject,
JsonEvent::EndObject,
JsonEvent::ObjectKey("boolean".into()),
JsonEvent::Boolean(value),
JsonEvent::EndObject,
]
}
pub struct ToWriteJsonSolutionsWriter<W: Write> {
inner: InnerJsonSolutionsWriter,
writer: ToWriteJsonWriter<W>, writer: ToWriteJsonWriter<W>,
} }
impl<W: Write> JsonSolutionsWriter<W> { impl<W: Write> ToWriteJsonSolutionsWriter<W> {
pub fn start(write: W, variables: &[Variable]) -> io::Result<Self> { pub fn start(write: W, variables: &[Variable]) -> io::Result<Self> {
let mut writer = ToWriteJsonWriter::new(write); let mut writer = ToWriteJsonWriter::new(write);
writer.write_event(JsonEvent::StartObject)?; let mut buffer = Vec::with_capacity(48);
writer.write_event(JsonEvent::ObjectKey("head".into()))?; let inner = InnerJsonSolutionsWriter::start(&mut buffer, variables);
writer.write_event(JsonEvent::StartObject)?; Self::do_write(&mut writer, buffer)?;
writer.write_event(JsonEvent::ObjectKey("vars".into()))?; Ok(Self { inner, writer })
writer.write_event(JsonEvent::StartArray)?;
for variable in variables {
writer.write_event(JsonEvent::String(variable.as_str().into()))?;
}
writer.write_event(JsonEvent::EndArray)?;
writer.write_event(JsonEvent::EndObject)?;
writer.write_event(JsonEvent::ObjectKey("results".into()))?;
writer.write_event(JsonEvent::StartObject)?;
writer.write_event(JsonEvent::ObjectKey("bindings".into()))?;
writer.write_event(JsonEvent::StartArray)?;
Ok(Self { writer })
} }
pub fn write<'a>( pub fn write<'a>(
&mut self, &mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>, solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> { ) -> io::Result<()> {
self.writer.write_event(JsonEvent::StartObject)?; let mut buffer = Vec::with_capacity(48);
for (variable, value) in solution { self.inner.write(&mut buffer, solution);
self.writer Self::do_write(&mut self.writer, buffer)
.write_event(JsonEvent::ObjectKey(variable.as_str().into()))?;
write_json_term(value, &mut self.writer)?;
}
self.writer.write_event(JsonEvent::EndObject)?;
Ok(())
} }
pub fn finish(mut self) -> io::Result<W> { pub fn finish(mut self) -> io::Result<W> {
self.writer.write_event(JsonEvent::EndArray)?; let mut buffer = Vec::with_capacity(4);
self.writer.write_event(JsonEvent::EndObject)?; self.inner.finish(&mut buffer);
self.writer.write_event(JsonEvent::EndObject)?; Self::do_write(&mut self.writer, buffer)?;
self.writer.finish() self.writer.finish()
} }
fn do_write(writer: &mut ToWriteJsonWriter<W>, output: Vec<JsonEvent<'_>>) -> io::Result<()> {
for event in output {
writer.write_event(event)?;
}
Ok(())
}
}
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteJsonSolutionsWriter<W: AsyncWrite + Unpin> {
inner: InnerJsonSolutionsWriter,
writer: ToTokioAsyncWriteJsonWriter<W>,
} }
fn write_json_term( #[cfg(feature = "async-tokio")]
term: TermRef<'_>, impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteJsonSolutionsWriter<W> {
writer: &mut ToWriteJsonWriter<impl Write>, pub async fn start(write: W, variables: &[Variable]) -> io::Result<Self> {
let mut writer = ToTokioAsyncWriteJsonWriter::new(write);
let mut buffer = Vec::with_capacity(48);
let inner = InnerJsonSolutionsWriter::start(&mut buffer, variables);
Self::do_write(&mut writer, buffer).await?;
Ok(Self { writer, inner })
}
pub async fn write<'a>(
&mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> { ) -> io::Result<()> {
let mut buffer = Vec::with_capacity(48);
self.inner.write(&mut buffer, solution);
Self::do_write(&mut self.writer, buffer).await
}
pub async fn finish(mut self) -> io::Result<W> {
let mut buffer = Vec::with_capacity(4);
self.inner.finish(&mut buffer);
Self::do_write(&mut self.writer, buffer).await?;
self.writer.finish()
}
async fn do_write(
writer: &mut ToTokioAsyncWriteJsonWriter<W>,
output: Vec<JsonEvent<'_>>,
) -> io::Result<()> {
for event in output {
writer.write_event(event).await?;
}
Ok(())
}
}
struct InnerJsonSolutionsWriter;
impl InnerJsonSolutionsWriter {
fn start<'a>(output: &mut Vec<JsonEvent<'a>>, variables: &'a [Variable]) -> Self {
output.push(JsonEvent::StartObject);
output.push(JsonEvent::ObjectKey("head".into()));
output.push(JsonEvent::StartObject);
output.push(JsonEvent::ObjectKey("vars".into()));
output.push(JsonEvent::StartArray);
for variable in variables {
output.push(JsonEvent::String(variable.as_str().into()));
}
output.push(JsonEvent::EndArray);
output.push(JsonEvent::EndObject);
output.push(JsonEvent::ObjectKey("results".into()));
output.push(JsonEvent::StartObject);
output.push(JsonEvent::ObjectKey("bindings".into()));
output.push(JsonEvent::StartArray);
Self {}
}
#[allow(clippy::unused_self)]
fn write<'a>(
&self,
output: &mut Vec<JsonEvent<'a>>,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) {
output.push(JsonEvent::StartObject);
for (variable, value) in solution {
output.push(JsonEvent::ObjectKey(variable.as_str().into()));
write_json_term(output, value);
}
output.push(JsonEvent::EndObject);
}
#[allow(clippy::unused_self)]
fn finish(self, output: &mut Vec<JsonEvent<'_>>) {
output.push(JsonEvent::EndArray);
output.push(JsonEvent::EndObject);
output.push(JsonEvent::EndObject);
}
}
fn write_json_term<'a>(output: &mut Vec<JsonEvent<'a>>, term: TermRef<'a>) {
match term { match term {
TermRef::NamedNode(uri) => { TermRef::NamedNode(uri) => {
writer.write_event(JsonEvent::StartObject)?; output.push(JsonEvent::StartObject);
writer.write_event(JsonEvent::ObjectKey("type".into()))?; output.push(JsonEvent::ObjectKey("type".into()));
writer.write_event(JsonEvent::String("uri".into()))?; output.push(JsonEvent::String("uri".into()));
writer.write_event(JsonEvent::ObjectKey("value".into()))?; output.push(JsonEvent::ObjectKey("value".into()));
writer.write_event(JsonEvent::String(uri.as_str().into()))?; output.push(JsonEvent::String(uri.as_str().into()));
writer.write_event(JsonEvent::EndObject)?; output.push(JsonEvent::EndObject);
} }
TermRef::BlankNode(bnode) => { TermRef::BlankNode(bnode) => {
writer.write_event(JsonEvent::StartObject)?; output.push(JsonEvent::StartObject);
writer.write_event(JsonEvent::ObjectKey("type".into()))?; output.push(JsonEvent::ObjectKey("type".into()));
writer.write_event(JsonEvent::String("bnode".into()))?; output.push(JsonEvent::String("bnode".into()));
writer.write_event(JsonEvent::ObjectKey("value".into()))?; output.push(JsonEvent::ObjectKey("value".into()));
writer.write_event(JsonEvent::String(bnode.as_str().into()))?; output.push(JsonEvent::String(bnode.as_str().into()));
writer.write_event(JsonEvent::EndObject)?; output.push(JsonEvent::EndObject);
} }
TermRef::Literal(literal) => { TermRef::Literal(literal) => {
writer.write_event(JsonEvent::StartObject)?; output.push(JsonEvent::StartObject);
writer.write_event(JsonEvent::ObjectKey("type".into()))?; output.push(JsonEvent::ObjectKey("type".into()));
writer.write_event(JsonEvent::String("literal".into()))?; output.push(JsonEvent::String("literal".into()));
writer.write_event(JsonEvent::ObjectKey("value".into()))?; output.push(JsonEvent::ObjectKey("value".into()));
writer.write_event(JsonEvent::String(literal.value().into()))?; output.push(JsonEvent::String(literal.value().into()));
if let Some(language) = literal.language() { if let Some(language) = literal.language() {
writer.write_event(JsonEvent::ObjectKey("xml:lang".into()))?; output.push(JsonEvent::ObjectKey("xml:lang".into()));
writer.write_event(JsonEvent::String(language.into()))?; output.push(JsonEvent::String(language.into()));
} else if !literal.is_plain() { } else if !literal.is_plain() {
writer.write_event(JsonEvent::ObjectKey("datatype".into()))?; output.push(JsonEvent::ObjectKey("datatype".into()));
writer.write_event(JsonEvent::String(literal.datatype().as_str().into()))?; output.push(JsonEvent::String(literal.datatype().as_str().into()));
} }
writer.write_event(JsonEvent::EndObject)?; output.push(JsonEvent::EndObject);
} }
#[cfg(feature = "rdf-star")] #[cfg(feature = "rdf-star")]
TermRef::Triple(triple) => { TermRef::Triple(triple) => {
writer.write_event(JsonEvent::StartObject)?; output.push(JsonEvent::StartObject);
writer.write_event(JsonEvent::ObjectKey("type".into()))?; output.push(JsonEvent::ObjectKey("type".into()));
writer.write_event(JsonEvent::String("triple".into()))?; output.push(JsonEvent::String("triple".into()));
writer.write_event(JsonEvent::ObjectKey("value".into()))?; output.push(JsonEvent::ObjectKey("value".into()));
writer.write_event(JsonEvent::StartObject)?; output.push(JsonEvent::StartObject);
writer.write_event(JsonEvent::ObjectKey("subject".into()))?; output.push(JsonEvent::ObjectKey("subject".into()));
write_json_term(triple.subject.as_ref().into(), writer)?; write_json_term(output, triple.subject.as_ref().into());
writer.write_event(JsonEvent::ObjectKey("predicate".into()))?; output.push(JsonEvent::ObjectKey("predicate".into()));
write_json_term(triple.predicate.as_ref().into(), writer)?; write_json_term(output, triple.predicate.as_ref().into());
writer.write_event(JsonEvent::ObjectKey("object".into()))?; output.push(JsonEvent::ObjectKey("object".into()));
write_json_term(triple.object.as_ref(), writer)?; write_json_term(output, triple.object.as_ref());
writer.write_event(JsonEvent::EndObject)?; output.push(JsonEvent::EndObject);
writer.write_event(JsonEvent::EndObject)?; output.push(JsonEvent::EndObject);
} }
} }
Ok(())
} }
pub enum JsonQueryResultsReader<R: Read> { pub enum JsonQueryResultsReader<R: Read> {

@ -1,11 +1,20 @@
#[cfg(feature = "async-tokio")]
use crate::csv::{ use crate::csv::{
write_boolean_csv_result, write_boolean_tsv_result, CsvSolutionsWriter, TsvSolutionsWriter, tokio_async_write_boolean_csv_result, ToTokioAsyncWriteCsvSolutionsWriter,
ToTokioAsyncWriteTsvSolutionsWriter,
}; };
use crate::csv::{write_boolean_csv_result, ToWriteCsvSolutionsWriter, ToWriteTsvSolutionsWriter};
use crate::format::QueryResultsFormat; use crate::format::QueryResultsFormat;
use crate::json::{write_boolean_json_result, JsonSolutionsWriter}; #[cfg(feature = "async-tokio")]
use crate::xml::{write_boolean_xml_result, XmlSolutionsWriter}; use crate::json::{tokio_async_write_boolean_json_result, ToTokioAsyncWriteJsonSolutionsWriter};
use crate::json::{write_boolean_json_result, ToWriteJsonSolutionsWriter};
#[cfg(feature = "async-tokio")]
use crate::xml::{tokio_async_write_boolean_xml_result, ToTokioAsyncWriteXmlSolutionsWriter};
use crate::xml::{write_boolean_xml_result, ToWriteXmlSolutionsWriter};
use oxrdf::{TermRef, Variable, VariableRef}; use oxrdf::{TermRef, Variable, VariableRef};
use std::io::{self, Write}; use std::io::{self, Write};
#[cfg(feature = "async-tokio")]
use tokio::io::AsyncWrite;
/// A serializer for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats. /// A serializer for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats.
/// ///
@ -15,7 +24,7 @@ use std::io::{self, Write};
/// * [SPARQL Query Results CSV Format](https://www.w3.org/TR/sparql11-results-csv-tsv/) ([`QueryResultsFormat::Csv`](QueryResultsFormat::Csv)) /// * [SPARQL Query Results CSV Format](https://www.w3.org/TR/sparql11-results-csv-tsv/) ([`QueryResultsFormat::Csv`](QueryResultsFormat::Csv))
/// * [SPARQL Query Results TSV Format](https://www.w3.org/TR/sparql11-results-csv-tsv/) ([`QueryResultsFormat::Tsv`](QueryResultsFormat::Tsv)) /// * [SPARQL Query Results TSV Format](https://www.w3.org/TR/sparql11-results-csv-tsv/) ([`QueryResultsFormat::Tsv`](QueryResultsFormat::Tsv))
/// ///
/// Example in JSON (the API is the same for XML and TSV): /// Example in JSON (the API is the same for XML, CSV and TSV):
/// ``` /// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// use sparesults::{QueryResultsFormat, QueryResultsSerializer};
/// use oxrdf::{LiteralRef, Variable, VariableRef}; /// use oxrdf::{LiteralRef, Variable, VariableRef};
@ -49,13 +58,13 @@ impl QueryResultsSerializer {
/// Write a boolean query result (from an `ASK` query) into the given [`Write`] implementation. /// Write a boolean query result (from an `ASK` query) into the given [`Write`] implementation.
/// ///
/// Example in XML (the API is the same for JSON and TSV): /// Example in XML (the API is the same for JSON, CSV and TSV):
/// ``` /// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// use sparesults::{QueryResultsFormat, QueryResultsSerializer};
/// ///
/// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml); /// let xml_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml);
/// let mut buffer = Vec::new(); /// let mut buffer = Vec::new();
/// json_serializer.serialize_boolean_to_write(&mut buffer, true)?; /// xml_serializer.serialize_boolean_to_write(&mut buffer, true)?;
/// assert_eq!(buffer, b"<?xml version=\"1.0\"?><sparql xmlns=\"http://www.w3.org/2005/sparql-results#\"><head></head><boolean>true</boolean></sparql>"); /// assert_eq!(buffer, b"<?xml version=\"1.0\"?><sparql xmlns=\"http://www.w3.org/2005/sparql-results#\"><head></head><boolean>true</boolean></sparql>");
/// # std::io::Result::Ok(()) /// # std::io::Result::Ok(())
/// ``` /// ```
@ -63,8 +72,39 @@ impl QueryResultsSerializer {
match self.format { match self.format {
QueryResultsFormat::Xml => write_boolean_xml_result(write, value), QueryResultsFormat::Xml => write_boolean_xml_result(write, value),
QueryResultsFormat::Json => write_boolean_json_result(write, value), QueryResultsFormat::Json => write_boolean_json_result(write, value),
QueryResultsFormat::Csv => write_boolean_csv_result(write, value), QueryResultsFormat::Csv | QueryResultsFormat::Tsv => {
QueryResultsFormat::Tsv => write_boolean_tsv_result(write, value), write_boolean_csv_result(write, value)
}
}
}
/// Write a boolean query result (from an `ASK` query) into the given [`AsyncWrite`] implementation.
///
/// Example in JSON (the API is the same for XML, CSV and TSV):
/// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Json);
/// let mut buffer = Vec::new();
/// json_serializer.serialize_boolean_to_tokio_async_write(&mut buffer, false).await?;
/// assert_eq!(buffer, b"{\"head\":{},\"boolean\":false}");
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub async fn serialize_boolean_to_tokio_async_write<W: AsyncWrite + Unpin>(
&self,
write: W,
value: bool,
) -> io::Result<W> {
match self.format {
QueryResultsFormat::Xml => tokio_async_write_boolean_xml_result(write, value).await,
QueryResultsFormat::Json => tokio_async_write_boolean_json_result(write, value).await,
QueryResultsFormat::Csv | QueryResultsFormat::Tsv => {
tokio_async_write_boolean_csv_result(write, value).await
}
} }
} }
@ -79,15 +119,15 @@ impl QueryResultsSerializer {
/// ///
/// <div class="warning">This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.</div> /// <div class="warning">This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.</div>
/// ///
/// Example in XML (the API is the same for JSON and TSV): /// Example in XML (the API is the same for JSON, CSV and TSV):
/// ``` /// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// use sparesults::{QueryResultsFormat, QueryResultsSerializer};
/// use oxrdf::{LiteralRef, Variable, VariableRef}; /// use oxrdf::{LiteralRef, Variable, VariableRef};
/// use std::iter::once; /// use std::iter::once;
/// ///
/// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml); /// let xml_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Xml);
/// let mut buffer = Vec::new(); /// let mut buffer = Vec::new();
/// let mut writer = json_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?; /// let mut writer = xml_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?;
/// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test"))))?; /// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test"))))?;
/// writer.finish()?; /// writer.finish()?;
/// assert_eq!(buffer, b"<?xml version=\"1.0\"?><sparql xmlns=\"http://www.w3.org/2005/sparql-results#\"><head><variable name=\"foo\"/><variable name=\"bar\"/></head><results><result><binding name=\"foo\"><literal>test</literal></binding></result></results></sparql>"); /// assert_eq!(buffer, b"<?xml version=\"1.0\"?><sparql xmlns=\"http://www.w3.org/2005/sparql-results#\"><head><variable name=\"foo\"/><variable name=\"bar\"/></head><results><result><binding name=\"foo\"><literal>test</literal></binding></result></results></sparql>");
@ -100,18 +140,65 @@ impl QueryResultsSerializer {
) -> io::Result<ToWriteSolutionsWriter<W>> { ) -> io::Result<ToWriteSolutionsWriter<W>> {
Ok(ToWriteSolutionsWriter { Ok(ToWriteSolutionsWriter {
formatter: match self.format { formatter: match self.format {
QueryResultsFormat::Xml => { QueryResultsFormat::Xml => ToWriteSolutionsWriterKind::Xml(
ToWriteSolutionsWriterKind::Xml(XmlSolutionsWriter::start(write, &variables)?) ToWriteXmlSolutionsWriter::start(write, &variables)?,
} ),
QueryResultsFormat::Json => { QueryResultsFormat::Json => ToWriteSolutionsWriterKind::Json(
ToWriteSolutionsWriterKind::Json(JsonSolutionsWriter::start(write, &variables)?) ToWriteJsonSolutionsWriter::start(write, &variables)?,
} ),
QueryResultsFormat::Csv => { QueryResultsFormat::Csv => ToWriteSolutionsWriterKind::Csv(
ToWriteSolutionsWriterKind::Csv(CsvSolutionsWriter::start(write, variables)?) ToWriteCsvSolutionsWriter::start(write, variables)?,
} ),
QueryResultsFormat::Tsv => { QueryResultsFormat::Tsv => ToWriteSolutionsWriterKind::Tsv(
ToWriteSolutionsWriterKind::Tsv(TsvSolutionsWriter::start(write, variables)?) ToWriteTsvSolutionsWriter::start(write, variables)?,
),
},
})
} }
/// Returns a `SolutionsWriter` allowing writing query solutions into the given [`Write`] implementation.
///
/// <div class="warning">Do not forget to run the [`finish`](ToWriteSolutionsWriter::finish()) method to properly write the last bytes of the file.</div>
///
/// <div class="warning">This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.</div>
///
/// Example in XML (the API is the same for JSON, CSV and TSV):
/// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer};
/// use oxrdf::{LiteralRef, Variable, VariableRef};
/// use std::iter::once;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Json);
/// let mut buffer = Vec::new();
/// let mut writer = json_serializer.serialize_solutions_to_tokio_async_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]).await?;
/// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test")))).await?;
/// writer.finish().await?;
/// assert_eq!(buffer, b"{\"head\":{\"vars\":[\"foo\",\"bar\"]},\"results\":{\"bindings\":[{\"foo\":{\"type\":\"literal\",\"value\":\"test\"}}]}}");
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "async-tokio")]
pub async fn serialize_solutions_to_tokio_async_write<W: AsyncWrite + Unpin>(
&self,
write: W,
variables: Vec<Variable>,
) -> io::Result<ToTokioAsyncWriteSolutionsWriter<W>> {
Ok(ToTokioAsyncWriteSolutionsWriter {
formatter: match self.format {
QueryResultsFormat::Xml => ToTokioAsyncWriteSolutionsWriterKind::Xml(
ToTokioAsyncWriteXmlSolutionsWriter::start(write, &variables).await?,
),
QueryResultsFormat::Json => ToTokioAsyncWriteSolutionsWriterKind::Json(
ToTokioAsyncWriteJsonSolutionsWriter::start(write, &variables).await?,
),
QueryResultsFormat::Csv => ToTokioAsyncWriteSolutionsWriterKind::Csv(
ToTokioAsyncWriteCsvSolutionsWriter::start(write, variables).await?,
),
QueryResultsFormat::Tsv => ToTokioAsyncWriteSolutionsWriterKind::Tsv(
ToTokioAsyncWriteTsvSolutionsWriter::start(write, variables).await?,
),
}, },
}) })
} }
@ -126,22 +213,23 @@ impl QueryResultsSerializer {
} }
} }
/// Allows writing query results. /// Allows writing query results into a [`Write`] implementation.
///
/// Could be built using a [`QueryResultsSerializer`]. /// Could be built using a [`QueryResultsSerializer`].
/// ///
/// <div class="warning">Do not forget to run the [`finish`](ToWriteSolutionsWriter::finish()) method to properly write the last bytes of the file.</div> /// <div class="warning">Do not forget to run the [`finish`](ToWriteSolutionsWriter::finish()) method to properly write the last bytes of the file.</div>
/// ///
/// <div class="warning">This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.</div> /// <div class="warning">This writer does unbuffered writes. You might want to use [`BufWriter`](io::BufWriter) to avoid that.</div>
/// ///
/// Example in TSV (the API is the same for JSON and XML): /// Example in TSV (the API is the same for JSON, XML and CSV):
/// ``` /// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer}; /// use sparesults::{QueryResultsFormat, QueryResultsSerializer};
/// use oxrdf::{LiteralRef, Variable, VariableRef}; /// use oxrdf::{LiteralRef, Variable, VariableRef};
/// use std::iter::once; /// use std::iter::once;
/// ///
/// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Tsv); /// let tsv_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Tsv);
/// let mut buffer = Vec::new(); /// let mut buffer = Vec::new();
/// let mut writer = json_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?; /// let mut writer = tsv_serializer.serialize_solutions_to_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")])?;
/// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test"))))?; /// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test"))))?;
/// writer.finish()?; /// writer.finish()?;
/// assert_eq!(buffer, b"?foo\t?bar\n\"test\"\t\n"); /// assert_eq!(buffer, b"?foo\t?bar\n\"test\"\t\n");
@ -153,16 +241,16 @@ pub struct ToWriteSolutionsWriter<W: Write> {
} }
enum ToWriteSolutionsWriterKind<W: Write> { enum ToWriteSolutionsWriterKind<W: Write> {
Xml(XmlSolutionsWriter<W>), Xml(ToWriteXmlSolutionsWriter<W>),
Json(JsonSolutionsWriter<W>), Json(ToWriteJsonSolutionsWriter<W>),
Csv(CsvSolutionsWriter<W>), Csv(ToWriteCsvSolutionsWriter<W>),
Tsv(TsvSolutionsWriter<W>), Tsv(ToWriteTsvSolutionsWriter<W>),
} }
impl<W: Write> ToWriteSolutionsWriter<W> { impl<W: Write> ToWriteSolutionsWriter<W> {
/// Writes a solution. /// Writes a solution.
/// ///
/// Example in JSON (the API is the same for XML and TSV): /// Example in JSON (the API is the same for XML, CSV and TSV):
/// ``` /// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer, QuerySolution}; /// use sparesults::{QueryResultsFormat, QueryResultsSerializer, QuerySolution};
/// use oxrdf::{Literal, LiteralRef, Variable, VariableRef}; /// use oxrdf::{Literal, LiteralRef, Variable, VariableRef};
@ -200,3 +288,88 @@ impl<W: Write> ToWriteSolutionsWriter<W> {
} }
} }
} }
/// Allows writing query results into an [`AsyncWrite`] implementation.
/// Could be built using a [`QueryResultsSerializer`].
///
/// <div class="warning">Do not forget to run the [`finish`](ToTokioAsyncWriteSolutionsWriter::finish()) method to properly write the last bytes of the file.</div>
///
/// <div class="warning">This writer does unbuffered writes. You might want to use [`BufWriter`](tokio::io::BufWriter) to avoid that.</div>
///
/// Example in TSV (the API is the same for JSON, CSV and XML):
/// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer};
/// use oxrdf::{LiteralRef, Variable, VariableRef};
/// use std::iter::once;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let tsv_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Tsv);
/// let mut buffer = Vec::new();
/// let mut writer = tsv_serializer.serialize_solutions_to_tokio_async_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]).await?;
/// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test")))).await?;
/// writer.finish().await?;
/// assert_eq!(buffer, b"?foo\t?bar\n\"test\"\t\n");
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "async-tokio")]
#[must_use]
pub struct ToTokioAsyncWriteSolutionsWriter<W: AsyncWrite + Unpin> {
formatter: ToTokioAsyncWriteSolutionsWriterKind<W>,
}
#[cfg(feature = "async-tokio")]
enum ToTokioAsyncWriteSolutionsWriterKind<W: AsyncWrite + Unpin> {
Xml(ToTokioAsyncWriteXmlSolutionsWriter<W>),
Json(ToTokioAsyncWriteJsonSolutionsWriter<W>),
Csv(ToTokioAsyncWriteCsvSolutionsWriter<W>),
Tsv(ToTokioAsyncWriteTsvSolutionsWriter<W>),
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteSolutionsWriter<W> {
/// Writes a solution.
///
/// Example in JSON (the API is the same for XML, CSV and TSV):
/// ```
/// use sparesults::{QueryResultsFormat, QueryResultsSerializer, QuerySolution};
/// use oxrdf::{Literal, LiteralRef, Variable, VariableRef};
/// use std::iter::once;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let json_serializer = QueryResultsSerializer::from_format(QueryResultsFormat::Json);
/// let mut buffer = Vec::new();
/// let mut writer = json_serializer.serialize_solutions_to_tokio_async_write(&mut buffer, vec![Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]).await?;
/// writer.write(once((VariableRef::new_unchecked("foo"), LiteralRef::from("test")))).await?;
/// writer.write(&QuerySolution::from((vec![Variable::new_unchecked("bar")], vec![Some(Literal::from("test").into())]))).await?;
/// writer.finish().await?;
/// assert_eq!(buffer, b"{\"head\":{\"vars\":[\"foo\",\"bar\"]},\"results\":{\"bindings\":[{\"foo\":{\"type\":\"literal\",\"value\":\"test\"}},{\"bar\":{\"type\":\"literal\",\"value\":\"test\"}}]}}");
/// # Ok(())
/// # }
/// ```
pub async fn write<'a>(
&mut self,
solution: impl IntoIterator<Item = (impl Into<VariableRef<'a>>, impl Into<TermRef<'a>>)>,
) -> io::Result<()> {
let solution = solution.into_iter().map(|(v, s)| (v.into(), s.into()));
match &mut self.formatter {
ToTokioAsyncWriteSolutionsWriterKind::Xml(writer) => writer.write(solution).await,
ToTokioAsyncWriteSolutionsWriterKind::Json(writer) => writer.write(solution).await,
ToTokioAsyncWriteSolutionsWriterKind::Csv(writer) => writer.write(solution).await,
ToTokioAsyncWriteSolutionsWriterKind::Tsv(writer) => writer.write(solution).await,
}
}
/// Writes the last bytes of the file.
pub async fn finish(self) -> io::Result<W> {
match self.formatter {
ToTokioAsyncWriteSolutionsWriterKind::Xml(write) => write.finish().await,
ToTokioAsyncWriteSolutionsWriterKind::Json(write) => write.finish().await,
ToTokioAsyncWriteSolutionsWriterKind::Csv(write) => Ok(write.finish()),
ToTokioAsyncWriteSolutionsWriterKind::Tsv(write) => Ok(write.finish()),
}
}
}

@ -11,147 +11,212 @@ use std::collections::BTreeMap;
use std::io::{self, BufReader, Read, Write}; use std::io::{self, BufReader, Read, Write};
use std::str; use std::str;
use std::sync::Arc; use std::sync::Arc;
#[cfg(feature = "async-tokio")]
use tokio::io::AsyncWrite;
pub fn write_boolean_xml_result<W: Write>(sink: W, value: bool) -> io::Result<W> { pub fn write_boolean_xml_result<W: Write>(write: W, value: bool) -> io::Result<W> {
do_write_boolean_xml_result(sink, value).map_err(map_xml_error) let mut writer = Writer::new(write);
for event in inner_write_boolean_xml_result(value) {
writer.write_event(event).map_err(map_xml_error)?;
}
Ok(writer.into_inner())
} }
fn do_write_boolean_xml_result<W: Write>(sink: W, value: bool) -> Result<W, quick_xml::Error> { #[cfg(feature = "async-tokio")]
let mut writer = Writer::new(sink); pub async fn tokio_async_write_boolean_xml_result<W: AsyncWrite + Unpin>(
writer.write_event(Event::Decl(BytesDecl::new("1.0", None, None)))?; write: W,
value: bool,
) -> io::Result<W> {
let mut writer = Writer::new(write);
for event in inner_write_boolean_xml_result(value) {
writer writer
.create_element("sparql") .write_event_async(event)
.with_attribute(("xmlns", "http://www.w3.org/2005/sparql-results#")) .await
.write_inner_content(|writer| { .map_err(map_xml_error)?;
writer }
.create_element("head")
.write_text_content(BytesText::new(""))?
.create_element("boolean")
.write_text_content(BytesText::new(if value { "true" } else { "false" }))?;
quick_xml::Result::Ok(())
})?;
Ok(writer.into_inner()) Ok(writer.into_inner())
} }
pub struct XmlSolutionsWriter<W: Write> { fn inner_write_boolean_xml_result(value: bool) -> [Event<'static>; 8] {
writer: Writer<W>, [
Event::Decl(BytesDecl::new("1.0", None, None)),
Event::Start(
BytesStart::new("sparql")
.with_attributes([("xmlns", "http://www.w3.org/2005/sparql-results#")]),
),
Event::Start(BytesStart::new("head")),
Event::End(BytesEnd::new("head")),
Event::Start(BytesStart::new("boolean")),
Event::Text(BytesText::new(if value { "true" } else { "false" })),
Event::End(BytesEnd::new("boolean")),
Event::End(BytesEnd::new("sparql")),
]
} }
impl<W: Write> XmlSolutionsWriter<W> { pub struct ToWriteXmlSolutionsWriter<W: Write> {
pub fn start(sink: W, variables: &[Variable]) -> io::Result<Self> { inner: InnerXmlSolutionsWriter,
Self::do_start(sink, variables).map_err(map_xml_error) writer: Writer<W>,
} }
fn do_start(sink: W, variables: &[Variable]) -> Result<Self, quick_xml::Error> { impl<W: Write> ToWriteXmlSolutionsWriter<W> {
let mut writer = Writer::new(sink); pub fn start(write: W, variables: &[Variable]) -> io::Result<Self> {
writer.write_event(Event::Decl(BytesDecl::new("1.0", None, None)))?; let mut writer = Writer::new(write);
let mut sparql_open = BytesStart::new("sparql"); let mut buffer = Vec::with_capacity(48);
sparql_open.push_attribute(("xmlns", "http://www.w3.org/2005/sparql-results#")); let inner = InnerXmlSolutionsWriter::start(&mut buffer, variables);
writer.write_event(Event::Start(sparql_open))?; Self::do_write(&mut writer, buffer)?;
writer Ok(Self { inner, writer })
.create_element("head")
.write_inner_content(|writer| {
for variable in variables {
writer
.create_element("variable")
.with_attribute(("name", variable.as_str()))
.write_empty()?;
}
quick_xml::Result::Ok(())
})?;
writer.write_event(Event::Start(BytesStart::new("results")))?;
Ok(Self { writer })
} }
pub fn write<'a>( pub fn write<'a>(
&mut self, &mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>, solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> io::Result<()> { ) -> io::Result<()> {
self.do_write(solution).map_err(map_xml_error) let mut buffer = Vec::with_capacity(48);
self.inner.write(&mut buffer, solution);
Self::do_write(&mut self.writer, buffer)
}
pub fn finish(mut self) -> io::Result<W> {
let mut buffer = Vec::with_capacity(4);
self.inner.finish(&mut buffer);
Self::do_write(&mut self.writer, buffer)?;
Ok(self.writer.into_inner())
} }
fn do_write<'a>( fn do_write(writer: &mut Writer<W>, output: Vec<Event<'_>>) -> io::Result<()> {
for event in output {
writer.write_event(event).map_err(map_xml_error)?;
}
Ok(())
}
}
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteXmlSolutionsWriter<W: AsyncWrite + Unpin> {
inner: InnerXmlSolutionsWriter,
writer: Writer<W>,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteXmlSolutionsWriter<W> {
pub async fn start(write: W, variables: &[Variable]) -> io::Result<Self> {
let mut writer = Writer::new(write);
let mut buffer = Vec::with_capacity(48);
let inner = InnerXmlSolutionsWriter::start(&mut buffer, variables);
Self::do_write(&mut writer, buffer).await?;
Ok(Self { writer, inner })
}
pub async fn write<'a>(
&mut self, &mut self,
solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>, solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) -> Result<(), quick_xml::Error> { ) -> io::Result<()> {
self.writer let mut buffer = Vec::with_capacity(48);
.create_element("result") self.inner.write(&mut buffer, solution);
.write_inner_content(|writer| { Self::do_write(&mut self.writer, buffer).await
for (variable, value) in solution { }
pub async fn finish(mut self) -> io::Result<W> {
let mut buffer = Vec::with_capacity(4);
self.inner.finish(&mut buffer);
Self::do_write(&mut self.writer, buffer).await?;
Ok(self.writer.into_inner())
}
async fn do_write(writer: &mut Writer<W>, output: Vec<Event<'_>>) -> io::Result<()> {
for event in output {
writer writer
.create_element("binding") .write_event_async(event)
.with_attribute(("name", variable.as_str())) .await
.write_inner_content(|writer| write_xml_term(value, writer))?; .map_err(map_xml_error)?;
} }
quick_xml::Result::Ok(())
})?;
Ok(()) Ok(())
} }
}
struct InnerXmlSolutionsWriter;
pub fn finish(self) -> io::Result<W> { impl InnerXmlSolutionsWriter {
self.do_finish().map_err(map_xml_error) fn start<'a>(output: &mut Vec<Event<'a>>, variables: &'a [Variable]) -> Self {
output.push(Event::Decl(BytesDecl::new("1.0", None, None)));
output.push(Event::Start(BytesStart::new("sparql").with_attributes([(
"xmlns",
"http://www.w3.org/2005/sparql-results#",
)])));
output.push(Event::Start(BytesStart::new("head")));
for variable in variables {
output.push(Event::Empty(
BytesStart::new("variable").with_attributes([("name", variable.as_str())]),
));
}
output.push(Event::End(BytesEnd::new("head")));
output.push(Event::Start(BytesStart::new("results")));
Self {}
} }
fn do_finish(mut self) -> Result<W, quick_xml::Error> { #[allow(clippy::unused_self)]
self.writer
.write_event(Event::End(BytesEnd::new("results")))?; fn write<'a>(
self.writer &self,
.write_event(Event::End(BytesEnd::new("sparql")))?; output: &mut Vec<Event<'a>>,
Ok(self.writer.into_inner()) solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
) {
output.push(Event::Start(BytesStart::new("result")));
for (variable, value) in solution {
output.push(Event::Start(
BytesStart::new("binding").with_attributes([("name", variable.as_str())]),
));
write_xml_term(output, value);
output.push(Event::End(BytesEnd::new("binding")));
} }
output.push(Event::End(BytesEnd::new("result")));
} }
fn write_xml_term( #[allow(clippy::unused_self)]
term: TermRef<'_>, fn finish(self, output: &mut Vec<Event<'_>>) {
writer: &mut Writer<impl Write>, output.push(Event::End(BytesEnd::new("results")));
) -> Result<(), quick_xml::Error> { output.push(Event::End(BytesEnd::new("sparql")));
}
}
fn write_xml_term<'a>(output: &mut Vec<Event<'a>>, term: TermRef<'a>) {
match term { match term {
TermRef::NamedNode(uri) => { TermRef::NamedNode(uri) => {
writer output.push(Event::Start(BytesStart::new("uri")));
.create_element("uri") output.push(Event::Text(BytesText::new(uri.as_str())));
.write_text_content(BytesText::new(uri.as_str()))?; output.push(Event::End(BytesEnd::new("uri")));
} }
TermRef::BlankNode(bnode) => { TermRef::BlankNode(bnode) => {
writer output.push(Event::Start(BytesStart::new("bnode")));
.create_element("bnode") output.push(Event::Text(BytesText::new(bnode.as_str())));
.write_text_content(BytesText::new(bnode.as_str()))?; output.push(Event::End(BytesEnd::new("bnode")));
} }
TermRef::Literal(literal) => { TermRef::Literal(literal) => {
let element = writer.create_element("literal"); let mut start = BytesStart::new("literal");
let element = if let Some(language) = literal.language() { if let Some(language) = literal.language() {
element.with_attribute(("xml:lang", language)) start.push_attribute(("xml:lang", language));
} else if !literal.is_plain() { } else if !literal.is_plain() {
element.with_attribute(("datatype", literal.datatype().as_str())) start.push_attribute(("datatype", literal.datatype().as_str()))
} else { }
element output.push(Event::Start(start));
}; output.push(Event::Text(BytesText::new(literal.value())));
element.write_text_content(BytesText::new(literal.value()))?; output.push(Event::End(BytesEnd::new("literal")));
} }
#[cfg(feature = "rdf-star")] #[cfg(feature = "rdf-star")]
TermRef::Triple(triple) => { TermRef::Triple(triple) => {
writer output.push(Event::Start(BytesStart::new("triple")));
.create_element("triple") output.push(Event::Start(BytesStart::new("subject")));
.write_inner_content(|writer| { write_xml_term(output, triple.subject.as_ref().into());
writer output.push(Event::End(BytesEnd::new("subject")));
.create_element("subject") output.push(Event::Start(BytesStart::new("predicate")));
.write_inner_content(|writer| { write_xml_term(output, triple.predicate.as_ref().into());
write_xml_term(triple.subject.as_ref().into(), writer) output.push(Event::End(BytesEnd::new("predicate")));
})?; output.push(Event::Start(BytesStart::new("object")));
writer write_xml_term(output, triple.object.as_ref());
.create_element("predicate") output.push(Event::End(BytesEnd::new("object")));
.write_inner_content(|writer| { output.push(Event::End(BytesEnd::new("triple")));
write_xml_term(triple.predicate.as_ref().into(), writer)
})?;
writer
.create_element("object")
.write_inner_content(|writer| {
write_xml_term(triple.object.as_ref(), writer)
})?;
quick_xml::Result::Ok(())
})?;
} }
} }
Ok(())
} }
pub enum XmlQueryResultsReader<R: Read> { pub enum XmlQueryResultsReader<R: Read> {

Loading…
Cancel
Save