Makes SPARQL results TSV work with a Read implementation

pull/635/head
Tpt 1 year ago committed by Thomas Tanon
parent 412ca37b3c
commit 67fd726f9d
  1. 1
      Cargo.lock
  2. 1
      lib/sparesults/Cargo.toml
  3. 100
      lib/sparesults/src/csv.rs
  4. 12
      lib/sparesults/src/error.rs

1
Cargo.lock generated

@ -1673,6 +1673,7 @@ name = "sparesults"
version = "0.2.0-alpha.1-dev" version = "0.2.0-alpha.1-dev"
dependencies = [ dependencies = [
"json-event-parser", "json-event-parser",
"memchr",
"oxrdf", "oxrdf",
"quick-xml", "quick-xml",
] ]

@ -20,6 +20,7 @@ rdf-star = ["oxrdf/rdf-star"]
[dependencies] [dependencies]
json-event-parser = "0.1" json-event-parser = "0.1"
memchr = "2.5"
oxrdf = { version = "0.2.0-alpha.1-dev", path="../oxrdf" } oxrdf = { version = "0.2.0-alpha.1-dev", path="../oxrdf" }
quick-xml = ">=0.29, <0.31" quick-xml = ">=0.29, <0.31"

@ -1,10 +1,13 @@
//! Implementation of [SPARQL 1.1 Query Results CSV and TSV Formats](https://www.w3.org/TR/sparql11-results-csv-tsv/) //! Implementation of [SPARQL 1.1 Query Results CSV and TSV Formats](https://www.w3.org/TR/sparql11-results-csv-tsv/)
use crate::error::{ParseError, SyntaxError, SyntaxErrorKind}; use crate::error::{ParseError, SyntaxError, SyntaxErrorKind};
use memchr::memchr;
use oxrdf::Variable; use oxrdf::Variable;
use oxrdf::{vocab::xsd, *}; use oxrdf::{vocab::xsd, *};
use std::io::{self, BufRead, Write}; use std::io::{self, BufRead, Read, Write};
use std::str::FromStr; use std::str::{self, FromStr};
const MAX_BUFFER_SIZE: usize = 4096 * 4096;
pub fn write_boolean_csv_result<W: Write>(mut sink: W, value: bool) -> io::Result<W> { pub fn write_boolean_csv_result<W: Write>(mut sink: W, value: bool) -> io::Result<W> {
sink.write_all(if value { b"true" } else { b"false" })?; sink.write_all(if value { b"true" } else { b"false" })?;
@ -271,7 +274,7 @@ fn is_turtle_double(value: &str) -> bool {
(with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit) (with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit)
} }
pub enum TsvQueryResultsReader<R: BufRead> { pub enum TsvQueryResultsReader<R: Read> {
Solutions { Solutions {
variables: Vec<Variable>, variables: Vec<Variable>,
solutions: TsvSolutionsReader<R>, solutions: TsvSolutionsReader<R>,
@ -279,14 +282,13 @@ pub enum TsvQueryResultsReader<R: BufRead> {
Boolean(bool), Boolean(bool),
} }
impl<R: BufRead> TsvQueryResultsReader<R> { impl<R: Read> TsvQueryResultsReader<R> {
pub fn read(mut source: R) -> Result<Self, ParseError> { pub fn read(read: R) -> Result<Self, ParseError> {
let mut buffer = String::new(); let mut reader = LineReader::new(read);
// We read the header // We read the header
source.read_line(&mut buffer)?; let line = reader
let line = buffer .next_line()?
.as_str()
.trim_matches(|c| matches!(c, ' ' | '\r' | '\n')); .trim_matches(|c| matches!(c, ' ' | '\r' | '\n'));
if line.eq_ignore_ascii_case("true") { if line.eq_ignore_ascii_case("true") {
return Ok(Self::Boolean(true)); return Ok(Self::Boolean(true));
@ -316,29 +318,23 @@ impl<R: BufRead> TsvQueryResultsReader<R> {
let column_len = variables.len(); let column_len = variables.len();
Ok(Self::Solutions { Ok(Self::Solutions {
variables, variables,
solutions: TsvSolutionsReader { solutions: TsvSolutionsReader { reader, column_len },
source,
buffer,
column_len,
},
}) })
} }
} }
pub struct TsvSolutionsReader<R: BufRead> { pub struct TsvSolutionsReader<R: Read> {
source: R, reader: LineReader<R>,
buffer: String,
column_len: usize, column_len: usize,
} }
impl<R: BufRead> TsvSolutionsReader<R> { impl<R: BufRead> TsvSolutionsReader<R> {
pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, ParseError> { pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, ParseError> {
self.buffer.clear(); let line = self.reader.next_line()?;
if self.source.read_line(&mut self.buffer)? == 0 { if line.is_empty() {
return Ok(None); return Ok(None); // EOF
} }
let elements = self let elements = line
.buffer
.split('\t') .split('\t')
.map(|v| { .map(|v| {
let v = v.trim(); let v = v.trim();
@ -346,7 +342,10 @@ impl<R: BufRead> TsvSolutionsReader<R> {
Ok(None) Ok(None)
} else { } else {
Ok(Some(Term::from_str(v).map_err(|e| SyntaxError { Ok(Some(Term::from_str(v).map_err(|e| SyntaxError {
inner: SyntaxErrorKind::Term(e), inner: SyntaxErrorKind::Term {
error: e,
term: v.into(),
},
})?)) })?))
} }
}) })
@ -357,16 +356,67 @@ impl<R: BufRead> TsvSolutionsReader<R> {
Ok(Some(Vec::new())) // Zero columns case Ok(Some(Vec::new())) // Zero columns case
} else { } else {
Err(SyntaxError::msg(format!( Err(SyntaxError::msg(format!(
"This TSV files has {} columns but we found a row with {} columns: {:?}", "This TSV files has {} columns but we found a row with {} columns: {}",
self.column_len, self.column_len,
elements.len(), elements.len(),
self.buffer line
)) ))
.into()) .into())
} }
} }
} }
struct LineReader<R: Read> {
read: R,
buffer: Vec<u8>,
start: usize,
end: usize,
}
impl<R: Read> LineReader<R> {
fn new(read: R) -> Self {
Self {
read,
buffer: Vec::new(),
start: 0,
end: 0,
}
}
fn next_line(&mut self) -> io::Result<&str> {
self.buffer.copy_within(self.start..self.end, 0);
self.end -= self.start;
self.start = 0;
let line_end = loop {
if let Some(eol) = memchr(b'\n', &self.buffer[self.start..self.end]) {
break self.start + eol + 1;
}
if self.end + 1024 > self.buffer.len() {
if self.end + 1024 > MAX_BUFFER_SIZE {
return Err(io::Error::new(
io::ErrorKind::OutOfMemory,
format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"),
));
}
self.buffer.resize(self.end + 1024, b'\0');
}
let read = self.read.read(&mut self.buffer[self.end..])?;
if read == 0 {
break self.end;
}
self.end += read;
};
let result = str::from_utf8(&self.buffer[self.start..line_end]).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid UTF-8 in the TSV file: {e}"),
)
});
self.start = line_end;
result
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

@ -78,9 +78,9 @@ pub struct SyntaxError {
} }
#[derive(Debug)] #[derive(Debug)]
pub enum SyntaxErrorKind { pub(crate) enum SyntaxErrorKind {
Xml(quick_xml::Error), Xml(quick_xml::Error),
Term(TermParseError), Term { error: TermParseError, term: String },
Msg { msg: String }, Msg { msg: String },
} }
@ -99,7 +99,7 @@ impl fmt::Display for SyntaxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.inner { match &self.inner {
SyntaxErrorKind::Xml(e) => e.fmt(f), SyntaxErrorKind::Xml(e) => e.fmt(f),
SyntaxErrorKind::Term(e) => e.fmt(f), SyntaxErrorKind::Term { error, term } => write!(f, "{error}: {term}"),
SyntaxErrorKind::Msg { msg } => f.write_str(msg), SyntaxErrorKind::Msg { msg } => f.write_str(msg),
} }
} }
@ -110,7 +110,7 @@ impl Error for SyntaxError {
fn source(&self) -> Option<&(dyn Error + 'static)> { fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.inner { match &self.inner {
SyntaxErrorKind::Xml(e) => Some(e), SyntaxErrorKind::Xml(e) => Some(e),
SyntaxErrorKind::Term(e) => Some(e), SyntaxErrorKind::Term { error, .. } => Some(error),
SyntaxErrorKind::Msg { .. } => None, SyntaxErrorKind::Msg { .. } => None,
} }
} }
@ -130,7 +130,9 @@ impl From<SyntaxError> for io::Error {
} }
_ => Self::new(io::ErrorKind::InvalidData, error), _ => Self::new(io::ErrorKind::InvalidData, error),
}, },
SyntaxErrorKind::Term(error) => Self::new(io::ErrorKind::InvalidData, error), SyntaxErrorKind::Term { .. } => {
Self::new(io::ErrorKind::InvalidData, error.to_string())
}
SyntaxErrorKind::Msg { msg } => Self::new(io::ErrorKind::InvalidData, msg), SyntaxErrorKind::Msg { msg } => Self::new(io::ErrorKind::InvalidData, msg),
} }
} }

Loading…
Cancel
Save