diff --git a/Cargo.lock b/Cargo.lock index a958c167..93c81493 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1673,6 +1673,7 @@ name = "sparesults" version = "0.2.0-alpha.1-dev" dependencies = [ "json-event-parser", + "memchr", "oxrdf", "quick-xml", ] diff --git a/lib/sparesults/Cargo.toml b/lib/sparesults/Cargo.toml index 0a7904db..3fa9e27e 100644 --- a/lib/sparesults/Cargo.toml +++ b/lib/sparesults/Cargo.toml @@ -20,6 +20,7 @@ rdf-star = ["oxrdf/rdf-star"] [dependencies] json-event-parser = "0.1" +memchr = "2.5" oxrdf = { version = "0.2.0-alpha.1-dev", path="../oxrdf" } quick-xml = ">=0.29, <0.31" diff --git a/lib/sparesults/src/csv.rs b/lib/sparesults/src/csv.rs index 72adac29..6fbfeb40 100644 --- a/lib/sparesults/src/csv.rs +++ b/lib/sparesults/src/csv.rs @@ -1,10 +1,13 @@ //! Implementation of [SPARQL 1.1 Query Results CSV and TSV Formats](https://www.w3.org/TR/sparql11-results-csv-tsv/) use crate::error::{ParseError, SyntaxError, SyntaxErrorKind}; +use memchr::memchr; use oxrdf::Variable; use oxrdf::{vocab::xsd, *}; -use std::io::{self, BufRead, Write}; -use std::str::FromStr; +use std::io::{self, BufRead, Read, Write}; +use std::str::{self, FromStr}; + +const MAX_BUFFER_SIZE: usize = 4096 * 4096; pub fn write_boolean_csv_result(mut sink: W, value: bool) -> io::Result { sink.write_all(if value { b"true" } else { b"false" })?; @@ -271,7 +274,7 @@ fn is_turtle_double(value: &str) -> bool { (with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit) } -pub enum TsvQueryResultsReader { +pub enum TsvQueryResultsReader { Solutions { variables: Vec, solutions: TsvSolutionsReader, @@ -279,14 +282,13 @@ pub enum TsvQueryResultsReader { Boolean(bool), } -impl TsvQueryResultsReader { - pub fn read(mut source: R) -> Result { - let mut buffer = String::new(); +impl TsvQueryResultsReader { + pub fn read(read: R) -> Result { + let mut reader = LineReader::new(read); // We read the header - source.read_line(&mut buffer)?; - let line = buffer - .as_str() + let line = reader + .next_line()? .trim_matches(|c| matches!(c, ' ' | '\r' | '\n')); if line.eq_ignore_ascii_case("true") { return Ok(Self::Boolean(true)); @@ -316,29 +318,23 @@ impl TsvQueryResultsReader { let column_len = variables.len(); Ok(Self::Solutions { variables, - solutions: TsvSolutionsReader { - source, - buffer, - column_len, - }, + solutions: TsvSolutionsReader { reader, column_len }, }) } } -pub struct TsvSolutionsReader { - source: R, - buffer: String, +pub struct TsvSolutionsReader { + reader: LineReader, column_len: usize, } impl TsvSolutionsReader { pub fn read_next(&mut self) -> Result>>, ParseError> { - self.buffer.clear(); - if self.source.read_line(&mut self.buffer)? == 0 { - return Ok(None); + let line = self.reader.next_line()?; + if line.is_empty() { + return Ok(None); // EOF } - let elements = self - .buffer + let elements = line .split('\t') .map(|v| { let v = v.trim(); @@ -346,7 +342,10 @@ impl TsvSolutionsReader { Ok(None) } else { Ok(Some(Term::from_str(v).map_err(|e| SyntaxError { - inner: SyntaxErrorKind::Term(e), + inner: SyntaxErrorKind::Term { + error: e, + term: v.into(), + }, })?)) } }) @@ -357,16 +356,67 @@ impl TsvSolutionsReader { Ok(Some(Vec::new())) // Zero columns case } else { Err(SyntaxError::msg(format!( - "This TSV files has {} columns but we found a row with {} columns: {:?}", + "This TSV files has {} columns but we found a row with {} columns: {}", self.column_len, elements.len(), - self.buffer + line )) .into()) } } } +struct LineReader { + read: R, + buffer: Vec, + start: usize, + end: usize, +} + +impl LineReader { + fn new(read: R) -> Self { + Self { + read, + buffer: Vec::new(), + start: 0, + end: 0, + } + } + + fn next_line(&mut self) -> io::Result<&str> { + self.buffer.copy_within(self.start..self.end, 0); + self.end -= self.start; + self.start = 0; + let line_end = loop { + if let Some(eol) = memchr(b'\n', &self.buffer[self.start..self.end]) { + break self.start + eol + 1; + } + if self.end + 1024 > self.buffer.len() { + if self.end + 1024 > MAX_BUFFER_SIZE { + return Err(io::Error::new( + io::ErrorKind::OutOfMemory, + format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"), + )); + } + self.buffer.resize(self.end + 1024, b'\0'); + } + let read = self.read.read(&mut self.buffer[self.end..])?; + if read == 0 { + break self.end; + } + self.end += read; + }; + let result = str::from_utf8(&self.buffer[self.start..line_end]).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid UTF-8 in the TSV file: {e}"), + ) + }); + self.start = line_end; + result + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/sparesults/src/error.rs b/lib/sparesults/src/error.rs index d150e847..40510663 100644 --- a/lib/sparesults/src/error.rs +++ b/lib/sparesults/src/error.rs @@ -78,9 +78,9 @@ pub struct SyntaxError { } #[derive(Debug)] -pub enum SyntaxErrorKind { +pub(crate) enum SyntaxErrorKind { Xml(quick_xml::Error), - Term(TermParseError), + Term { error: TermParseError, term: String }, Msg { msg: String }, } @@ -99,7 +99,7 @@ impl fmt::Display for SyntaxError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.inner { SyntaxErrorKind::Xml(e) => e.fmt(f), - SyntaxErrorKind::Term(e) => e.fmt(f), + SyntaxErrorKind::Term { error, term } => write!(f, "{error}: {term}"), SyntaxErrorKind::Msg { msg } => f.write_str(msg), } } @@ -110,7 +110,7 @@ impl Error for SyntaxError { fn source(&self) -> Option<&(dyn Error + 'static)> { match &self.inner { SyntaxErrorKind::Xml(e) => Some(e), - SyntaxErrorKind::Term(e) => Some(e), + SyntaxErrorKind::Term { error, .. } => Some(error), SyntaxErrorKind::Msg { .. } => None, } } @@ -130,7 +130,9 @@ impl From for io::Error { } _ => Self::new(io::ErrorKind::InvalidData, error), }, - SyntaxErrorKind::Term(error) => Self::new(io::ErrorKind::InvalidData, error), + SyntaxErrorKind::Term { .. } => { + Self::new(io::ErrorKind::InvalidData, error.to_string()) + } SyntaxErrorKind::Msg { msg } => Self::new(io::ErrorKind::InvalidData, msg), } }