From 142418137974ce89cae479a76fb186d18a93f6b6 Mon Sep 17 00:00:00 2001 From: Tpt Date: Sun, 25 Feb 2024 17:43:56 +0100 Subject: [PATCH] Support Tokio async in SPARQL TSV results parser --- lib/sparesults/src/csv.rs | 237 +++++++++++++++++++++++++++-------- lib/sparesults/src/parser.rs | 29 +++-- 2 files changed, 205 insertions(+), 61 deletions(-) diff --git a/lib/sparesults/src/csv.rs b/lib/sparesults/src/csv.rs index 02f4df9b..a05aa2da 100644 --- a/lib/sparesults/src/csv.rs +++ b/lib/sparesults/src/csv.rs @@ -9,7 +9,7 @@ use oxrdf::*; use std::io::{self, Read, Write}; use std::str::{self, FromStr}; #[cfg(feature = "async-tokio")] -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; const MAX_BUFFER_SIZE: usize = 4096 * 4096; @@ -425,72 +425,159 @@ fn is_turtle_double(value: &str) -> bool { (with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit) } -pub enum TsvQueryResultsReader { +pub enum FromReadTsvQueryResultsReader { Solutions { variables: Vec, - solutions: TsvSolutionsReader, + solutions: FromReadTsvSolutionsReader, }, Boolean(bool), } -impl TsvQueryResultsReader { +impl FromReadTsvQueryResultsReader { pub fn read(mut read: R) -> Result { let mut reader = LineReader::new(); let mut buffer = Vec::new(); + let line = reader.next_line(&mut buffer, &mut read)?; + Ok(match inner_read_first_line(reader, line)? { + TsvInnerQueryResults::Solutions { + variables, + solutions, + } => Self::Solutions { + variables, + solutions: FromReadTsvSolutionsReader { + read, + inner: solutions, + buffer, + }, + }, + TsvInnerQueryResults::Boolean(value) => Self::Boolean(value), + }) + } +} - // We read the header - let line = reader - .next_line(&mut buffer, &mut read)? - .trim_matches(|c| matches!(c, ' ' | '\r' | '\n')); - if line.eq_ignore_ascii_case("true") { - return Ok(Self::Boolean(true)); - } - if line.eq_ignore_ascii_case("false") { - return Ok(Self::Boolean(false)); - } - let mut variables = Vec::new(); - if !line.is_empty() { - for v in line.split('\t') { - let v = v.trim(); - if v.is_empty() { - return Err(QueryResultsSyntaxError::msg("Empty column on the first row. The first row should be a list of variables like ?foo or $bar").into()); - } - let variable = Variable::from_str(v).map_err(|e| { - QueryResultsSyntaxError::msg(format!("Invalid variable declaration '{v}': {e}")) - })?; - if variables.contains(&variable) { - return Err(QueryResultsSyntaxError::msg(format!( - "The variable {variable} is declared twice" - )) - .into()); - } - variables.push(variable); - } - } - let column_len = variables.len(); - Ok(Self::Solutions { - variables, - solutions: TsvSolutionsReader { - read, - buffer, - reader, - column_len, +pub struct FromReadTsvSolutionsReader { + read: R, + inner: TsvInnerSolutionsReader, + buffer: Vec, +} + +impl FromReadTsvSolutionsReader { + pub fn read_next(&mut self) -> Result>>, QueryResultsParseError> { + let line = self + .inner + .reader + .next_line(&mut self.buffer, &mut self.read)?; + self.inner.read_next(line) + } +} + +#[cfg(feature = "async-tokio")] +pub enum FromTokioAsyncReadTsvQueryResultsReader { + Solutions { + variables: Vec, + solutions: FromTokioAsyncReadTsvSolutionsReader, + }, + Boolean(bool), +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadTsvQueryResultsReader { + pub async fn read(mut read: R) -> Result { + let mut reader = LineReader::new(); + let mut buffer = Vec::new(); + let line = reader.next_line_tokio_async(&mut buffer, &mut read).await?; + Ok(match inner_read_first_line(reader, line)? { + TsvInnerQueryResults::Solutions { + variables, + solutions, + } => Self::Solutions { + variables, + solutions: FromTokioAsyncReadTsvSolutionsReader { + read, + inner: solutions, + buffer, + }, }, + TsvInnerQueryResults::Boolean(value) => Self::Boolean(value), }) } } -pub struct TsvSolutionsReader { +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadTsvSolutionsReader { read: R, + inner: TsvInnerSolutionsReader, buffer: Vec, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadTsvSolutionsReader { + pub async fn read_next(&mut self) -> Result>>, QueryResultsParseError> { + let line = self + .inner + .reader + .next_line_tokio_async(&mut self.buffer, &mut self.read) + .await?; + self.inner.read_next(line) + } +} + +enum TsvInnerQueryResults { + Solutions { + variables: Vec, + solutions: TsvInnerSolutionsReader, + }, + Boolean(bool), +} + +fn inner_read_first_line( + reader: LineReader, + line: &str, +) -> Result { + let line = line.trim_matches(|c| matches!(c, ' ' | '\r' | '\n')); + if line.eq_ignore_ascii_case("true") { + return Ok(TsvInnerQueryResults::Boolean(true)); + } + if line.eq_ignore_ascii_case("false") { + return Ok(TsvInnerQueryResults::Boolean(false)); + } + let mut variables = Vec::new(); + if !line.is_empty() { + for v in line.split('\t') { + let v = v.trim(); + if v.is_empty() { + return Err(QueryResultsSyntaxError::msg("Empty column on the first row. The first row should be a list of variables like ?foo or $bar").into()); + } + let variable = Variable::from_str(v).map_err(|e| { + QueryResultsSyntaxError::msg(format!("Invalid variable declaration '{v}': {e}")) + })?; + if variables.contains(&variable) { + return Err(QueryResultsSyntaxError::msg(format!( + "The variable {variable} is declared twice" + )) + .into()); + } + variables.push(variable); + } + } + let column_len = variables.len(); + Ok(TsvInnerQueryResults::Solutions { + variables, + solutions: TsvInnerSolutionsReader { reader, column_len }, + }) +} + +struct TsvInnerSolutionsReader { reader: LineReader, column_len: usize, } -impl TsvSolutionsReader { +impl TsvInnerSolutionsReader { #[allow(clippy::unwrap_in_result)] - pub fn read_next(&mut self) -> Result>>, QueryResultsParseError> { - let line = self.reader.next_line(&mut self.buffer, &mut self.read)?; + pub fn read_next( + &self, + line: &str, + ) -> Result>>, QueryResultsParseError> { if line.is_empty() { return Ok(None); // EOF } @@ -620,6 +707,50 @@ impl LineReader { self.buffer_start = line_end; result } + + #[cfg(feature = "async-tokio")] + #[allow(clippy::unwrap_in_result)] + async fn next_line_tokio_async<'a>( + &mut self, + buffer: &'a mut Vec, + read: &mut (impl AsyncRead + Unpin), + ) -> io::Result<&'a str> { + let line_end = loop { + if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) { + break self.buffer_start + eol + 1; + } + if self.buffer_start > 0 { + buffer.copy_within(self.buffer_start..self.buffer_end, 0); + self.buffer_end -= self.buffer_start; + self.buffer_start = 0; + } + if self.buffer_end + 1024 > buffer.len() { + if self.buffer_end + 1024 > MAX_BUFFER_SIZE { + return Err(io::Error::new( + io::ErrorKind::OutOfMemory, + format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"), + )); + } + buffer.resize(self.buffer_end + 1024, b'\0'); + } + let read = read.read(&mut buffer[self.buffer_end..]).await?; + if read == 0 { + break self.buffer_end; + } + self.buffer_end += read; + }; + let result = str::from_utf8(&buffer[self.buffer_start..line_end]).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid UTF-8 in the TSV file: {e}"), + ) + }); + self.line_count += 1; + self.last_line_start = self.last_line_end; + self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap(); + self.buffer_start = line_end; + result + } } #[cfg(test)] @@ -712,10 +843,10 @@ mod tests { assert_eq!(buffer, "?x\t?literal\n\t\"String\"\n\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n"); // Read - if let TsvQueryResultsReader::Solutions { + if let FromReadTsvQueryResultsReader::Solutions { solutions: mut solutions_iter, variables: actual_variables, - } = TsvQueryResultsReader::read(buffer.as_bytes())? + } = FromReadTsvQueryResultsReader::read(buffer.as_bytes())? { assert_eq!(actual_variables.as_slice(), variables.as_slice()); let mut rows = Vec::new(); @@ -747,8 +878,8 @@ mod tests { let a_lot_of_strings = format!("?p\n{}\n", "<".repeat(100_000)); bad_tsvs.push(&a_lot_of_strings); for bad_tsv in bad_tsvs { - if let Ok(TsvQueryResultsReader::Solutions { mut solutions, .. }) = - TsvQueryResultsReader::read(bad_tsv.as_bytes()) + if let Ok(FromReadTsvQueryResultsReader::Solutions { mut solutions, .. }) = + FromReadTsvQueryResultsReader::read(bad_tsv.as_bytes()) { while let Ok(Some(_)) = solutions.read_next() {} } @@ -773,10 +904,10 @@ mod tests { #[test] fn test_no_columns_tsv_parsing() -> io::Result<()> { - if let TsvQueryResultsReader::Solutions { + if let FromReadTsvQueryResultsReader::Solutions { mut solutions, variables, - } = TsvQueryResultsReader::read(b"\n\n".as_slice())? + } = FromReadTsvQueryResultsReader::read(b"\n\n".as_slice())? { assert_eq!(variables, Vec::::new()); assert_eq!(solutions.read_next()?, Some(Vec::new())); @@ -803,10 +934,10 @@ mod tests { #[test] fn test_no_results_tsv_parsing() -> io::Result<()> { - if let TsvQueryResultsReader::Solutions { + if let FromReadTsvQueryResultsReader::Solutions { mut solutions, variables, - } = TsvQueryResultsReader::read(b"?a\n".as_slice())? + } = FromReadTsvQueryResultsReader::read(b"?a\n".as_slice())? { assert_eq!(variables, vec![Variable::new_unchecked("a")]); assert_eq!(solutions.read_next()?, None); diff --git a/lib/sparesults/src/parser.rs b/lib/sparesults/src/parser.rs index 2ac6d08f..d125008f 100644 --- a/lib/sparesults/src/parser.rs +++ b/lib/sparesults/src/parser.rs @@ -1,4 +1,6 @@ -use crate::csv::{TsvQueryResultsReader, TsvSolutionsReader}; +use crate::csv::{FromReadTsvQueryResultsReader, FromReadTsvSolutionsReader}; +#[cfg(feature = "async-tokio")] +use crate::csv::{FromTokioAsyncReadTsvQueryResultsReader, FromTokioAsyncReadTsvSolutionsReader}; use crate::error::{QueryResultsParseError, QueryResultsSyntaxError}; use crate::format::QueryResultsFormat; use crate::json::{FromReadJsonQueryResultsReader, FromReadJsonSolutionsReader}; @@ -104,9 +106,9 @@ impl QueryResultsParser { }), }, QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()), - QueryResultsFormat::Tsv => match TsvQueryResultsReader::read(reader)? { - TsvQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r), - TsvQueryResultsReader::Solutions { + QueryResultsFormat::Tsv => match FromReadTsvQueryResultsReader::read(reader)? { + FromReadTsvQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r), + FromReadTsvQueryResultsReader::Solutions { solutions, variables, } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { @@ -180,7 +182,16 @@ impl QueryResultsParser { }), }, QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()), - QueryResultsFormat::Tsv => return Err(QueryResultsSyntaxError::msg("The TSV query results parser does not support Tokio AsyncRead yet").into()), + QueryResultsFormat::Tsv => match FromTokioAsyncReadTsvQueryResultsReader::read(reader).await? { + FromTokioAsyncReadTsvQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r), + FromTokioAsyncReadTsvQueryResultsReader::Solutions { + solutions, + variables, + } => FromTokioAsyncReadQueryResultsReader::Solutions(FromTokioAsyncReadSolutionsReader { + variables: variables.into(), + solutions: FromTokioAsyncReadSolutionsReaderKind::Tsv(solutions), + }), + }, }) } } @@ -261,7 +272,7 @@ pub struct FromReadSolutionsReader { enum FromReadSolutionsReaderKind { Xml(FromReadXmlSolutionsReader), Json(FromReadJsonSolutionsReader), - Tsv(TsvSolutionsReader), + Tsv(FromReadTsvSolutionsReader), } impl FromReadSolutionsReader { @@ -313,7 +324,7 @@ impl Iterator for FromReadSolutionsReader { /// It is either a read boolean ([`bool`]) or a streaming reader of a set of solutions ([`FromReadSolutionsReader`]). /// /// Example in TSV (the API is the same for JSON and XML): -/// ```no_run +/// ``` /// use oxrdf::{Literal, Variable}; /// use sparesults::{ /// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser, @@ -393,6 +404,7 @@ pub struct FromTokioAsyncReadSolutionsReader { enum FromTokioAsyncReadSolutionsReaderKind { Json(FromTokioAsyncReadJsonSolutionsReader), Xml(FromTokioAsyncReadXmlSolutionsReader), + Tsv(FromTokioAsyncReadTsvSolutionsReader), } #[cfg(feature = "async-tokio")] @@ -400,7 +412,7 @@ impl FromTokioAsyncReadSolutionsReader { /// Ordered list of the declared variables at the beginning of the results. /// /// Example in TSV (the API is the same for JSON and XML): - /// ```no_run + /// ``` /// use oxrdf::Variable; /// use sparesults::{ /// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser, @@ -435,6 +447,7 @@ impl FromTokioAsyncReadSolutionsReader { match &mut self.solutions { FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await, FromTokioAsyncReadSolutionsReaderKind::Xml(reader) => reader.read_next().await, + FromTokioAsyncReadSolutionsReaderKind::Tsv(reader) => reader.read_next().await, } .transpose()? .map(|values| (Arc::clone(&self.variables), values).into()),