Support Tokio async in SPARQL TSV results parser

pull/803/head
Tpt 10 months ago committed by Thomas Tanon
parent 01d73fa62d
commit 1424181379
  1. 195
      lib/sparesults/src/csv.rs
  2. 29
      lib/sparesults/src/parser.rs

@ -9,7 +9,7 @@ use oxrdf::*;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::str::{self, FromStr}; use std::str::{self, FromStr};
#[cfg(feature = "async-tokio")] #[cfg(feature = "async-tokio")]
use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
const MAX_BUFFER_SIZE: usize = 4096 * 4096; const MAX_BUFFER_SIZE: usize = 4096 * 4096;
@ -425,28 +425,121 @@ fn is_turtle_double(value: &str) -> bool {
(with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit) (with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit)
} }
pub enum TsvQueryResultsReader<R: Read> { pub enum FromReadTsvQueryResultsReader<R: Read> {
Solutions { Solutions {
variables: Vec<Variable>, variables: Vec<Variable>,
solutions: TsvSolutionsReader<R>, solutions: FromReadTsvSolutionsReader<R>,
}, },
Boolean(bool), Boolean(bool),
} }
impl<R: Read> TsvQueryResultsReader<R> { impl<R: Read> FromReadTsvQueryResultsReader<R> {
pub fn read(mut read: R) -> Result<Self, QueryResultsParseError> { pub fn read(mut read: R) -> Result<Self, QueryResultsParseError> {
let mut reader = LineReader::new(); let mut reader = LineReader::new();
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let line = reader.next_line(&mut buffer, &mut read)?;
Ok(match inner_read_first_line(reader, line)? {
TsvInnerQueryResults::Solutions {
variables,
solutions,
} => Self::Solutions {
variables,
solutions: FromReadTsvSolutionsReader {
read,
inner: solutions,
buffer,
},
},
TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
})
}
}
pub struct FromReadTsvSolutionsReader<R: Read> {
read: R,
inner: TsvInnerSolutionsReader,
buffer: Vec<u8>,
}
impl<R: Read> FromReadTsvSolutionsReader<R> {
pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
let line = self
.inner
.reader
.next_line(&mut self.buffer, &mut self.read)?;
self.inner.read_next(line)
}
}
#[cfg(feature = "async-tokio")]
pub enum FromTokioAsyncReadTsvQueryResultsReader<R: AsyncRead + Unpin> {
Solutions {
variables: Vec<Variable>,
solutions: FromTokioAsyncReadTsvSolutionsReader<R>,
},
Boolean(bool),
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadTsvQueryResultsReader<R> {
pub async fn read(mut read: R) -> Result<Self, QueryResultsParseError> {
let mut reader = LineReader::new();
let mut buffer = Vec::new();
let line = reader.next_line_tokio_async(&mut buffer, &mut read).await?;
Ok(match inner_read_first_line(reader, line)? {
TsvInnerQueryResults::Solutions {
variables,
solutions,
} => Self::Solutions {
variables,
solutions: FromTokioAsyncReadTsvSolutionsReader {
read,
inner: solutions,
buffer,
},
},
TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
})
}
}
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadTsvSolutionsReader<R: AsyncRead + Unpin> {
read: R,
inner: TsvInnerSolutionsReader,
buffer: Vec<u8>,
}
// We read the header #[cfg(feature = "async-tokio")]
let line = reader impl<R: AsyncRead + Unpin> FromTokioAsyncReadTsvSolutionsReader<R> {
.next_line(&mut buffer, &mut read)? pub async fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
.trim_matches(|c| matches!(c, ' ' | '\r' | '\n')); let line = self
.inner
.reader
.next_line_tokio_async(&mut self.buffer, &mut self.read)
.await?;
self.inner.read_next(line)
}
}
enum TsvInnerQueryResults {
Solutions {
variables: Vec<Variable>,
solutions: TsvInnerSolutionsReader,
},
Boolean(bool),
}
fn inner_read_first_line(
reader: LineReader,
line: &str,
) -> Result<TsvInnerQueryResults, QueryResultsParseError> {
let line = line.trim_matches(|c| matches!(c, ' ' | '\r' | '\n'));
if line.eq_ignore_ascii_case("true") { if line.eq_ignore_ascii_case("true") {
return Ok(Self::Boolean(true)); return Ok(TsvInnerQueryResults::Boolean(true));
} }
if line.eq_ignore_ascii_case("false") { if line.eq_ignore_ascii_case("false") {
return Ok(Self::Boolean(false)); return Ok(TsvInnerQueryResults::Boolean(false));
} }
let mut variables = Vec::new(); let mut variables = Vec::new();
if !line.is_empty() { if !line.is_empty() {
@ -468,29 +561,23 @@ impl<R: Read> TsvQueryResultsReader<R> {
} }
} }
let column_len = variables.len(); let column_len = variables.len();
Ok(Self::Solutions { Ok(TsvInnerQueryResults::Solutions {
variables, variables,
solutions: TsvSolutionsReader { solutions: TsvInnerSolutionsReader { reader, column_len },
read,
buffer,
reader,
column_len,
},
}) })
} }
}
pub struct TsvSolutionsReader<R: Read> { struct TsvInnerSolutionsReader {
read: R,
buffer: Vec<u8>,
reader: LineReader, reader: LineReader,
column_len: usize, column_len: usize,
} }
impl<R: Read> TsvSolutionsReader<R> { impl TsvInnerSolutionsReader {
#[allow(clippy::unwrap_in_result)] #[allow(clippy::unwrap_in_result)]
pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> { pub fn read_next(
let line = self.reader.next_line(&mut self.buffer, &mut self.read)?; &self,
line: &str,
) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
if line.is_empty() { if line.is_empty() {
return Ok(None); // EOF return Ok(None); // EOF
} }
@ -620,6 +707,50 @@ impl LineReader {
self.buffer_start = line_end; self.buffer_start = line_end;
result result
} }
#[cfg(feature = "async-tokio")]
#[allow(clippy::unwrap_in_result)]
async fn next_line_tokio_async<'a>(
&mut self,
buffer: &'a mut Vec<u8>,
read: &mut (impl AsyncRead + Unpin),
) -> io::Result<&'a str> {
let line_end = loop {
if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) {
break self.buffer_start + eol + 1;
}
if self.buffer_start > 0 {
buffer.copy_within(self.buffer_start..self.buffer_end, 0);
self.buffer_end -= self.buffer_start;
self.buffer_start = 0;
}
if self.buffer_end + 1024 > buffer.len() {
if self.buffer_end + 1024 > MAX_BUFFER_SIZE {
return Err(io::Error::new(
io::ErrorKind::OutOfMemory,
format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"),
));
}
buffer.resize(self.buffer_end + 1024, b'\0');
}
let read = read.read(&mut buffer[self.buffer_end..]).await?;
if read == 0 {
break self.buffer_end;
}
self.buffer_end += read;
};
let result = str::from_utf8(&buffer[self.buffer_start..line_end]).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid UTF-8 in the TSV file: {e}"),
)
});
self.line_count += 1;
self.last_line_start = self.last_line_end;
self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap();
self.buffer_start = line_end;
result
}
} }
#[cfg(test)] #[cfg(test)]
@ -712,10 +843,10 @@ mod tests {
assert_eq!(buffer, "?x\t?literal\n<http://example/x>\t\"String\"\n<http://example/x>\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n<http://example/x>\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n"); assert_eq!(buffer, "?x\t?literal\n<http://example/x>\t\"String\"\n<http://example/x>\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n<http://example/x>\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n");
// Read // Read
if let TsvQueryResultsReader::Solutions { if let FromReadTsvQueryResultsReader::Solutions {
solutions: mut solutions_iter, solutions: mut solutions_iter,
variables: actual_variables, variables: actual_variables,
} = TsvQueryResultsReader::read(buffer.as_bytes())? } = FromReadTsvQueryResultsReader::read(buffer.as_bytes())?
{ {
assert_eq!(actual_variables.as_slice(), variables.as_slice()); assert_eq!(actual_variables.as_slice(), variables.as_slice());
let mut rows = Vec::new(); let mut rows = Vec::new();
@ -747,8 +878,8 @@ mod tests {
let a_lot_of_strings = format!("?p\n{}\n", "<".repeat(100_000)); let a_lot_of_strings = format!("?p\n{}\n", "<".repeat(100_000));
bad_tsvs.push(&a_lot_of_strings); bad_tsvs.push(&a_lot_of_strings);
for bad_tsv in bad_tsvs { for bad_tsv in bad_tsvs {
if let Ok(TsvQueryResultsReader::Solutions { mut solutions, .. }) = if let Ok(FromReadTsvQueryResultsReader::Solutions { mut solutions, .. }) =
TsvQueryResultsReader::read(bad_tsv.as_bytes()) FromReadTsvQueryResultsReader::read(bad_tsv.as_bytes())
{ {
while let Ok(Some(_)) = solutions.read_next() {} while let Ok(Some(_)) = solutions.read_next() {}
} }
@ -773,10 +904,10 @@ mod tests {
#[test] #[test]
fn test_no_columns_tsv_parsing() -> io::Result<()> { fn test_no_columns_tsv_parsing() -> io::Result<()> {
if let TsvQueryResultsReader::Solutions { if let FromReadTsvQueryResultsReader::Solutions {
mut solutions, mut solutions,
variables, variables,
} = TsvQueryResultsReader::read(b"\n\n".as_slice())? } = FromReadTsvQueryResultsReader::read(b"\n\n".as_slice())?
{ {
assert_eq!(variables, Vec::<Variable>::new()); assert_eq!(variables, Vec::<Variable>::new());
assert_eq!(solutions.read_next()?, Some(Vec::new())); assert_eq!(solutions.read_next()?, Some(Vec::new()));
@ -803,10 +934,10 @@ mod tests {
#[test] #[test]
fn test_no_results_tsv_parsing() -> io::Result<()> { fn test_no_results_tsv_parsing() -> io::Result<()> {
if let TsvQueryResultsReader::Solutions { if let FromReadTsvQueryResultsReader::Solutions {
mut solutions, mut solutions,
variables, variables,
} = TsvQueryResultsReader::read(b"?a\n".as_slice())? } = FromReadTsvQueryResultsReader::read(b"?a\n".as_slice())?
{ {
assert_eq!(variables, vec![Variable::new_unchecked("a")]); assert_eq!(variables, vec![Variable::new_unchecked("a")]);
assert_eq!(solutions.read_next()?, None); assert_eq!(solutions.read_next()?, None);

@ -1,4 +1,6 @@
use crate::csv::{TsvQueryResultsReader, TsvSolutionsReader}; use crate::csv::{FromReadTsvQueryResultsReader, FromReadTsvSolutionsReader};
#[cfg(feature = "async-tokio")]
use crate::csv::{FromTokioAsyncReadTsvQueryResultsReader, FromTokioAsyncReadTsvSolutionsReader};
use crate::error::{QueryResultsParseError, QueryResultsSyntaxError}; use crate::error::{QueryResultsParseError, QueryResultsSyntaxError};
use crate::format::QueryResultsFormat; use crate::format::QueryResultsFormat;
use crate::json::{FromReadJsonQueryResultsReader, FromReadJsonSolutionsReader}; use crate::json::{FromReadJsonQueryResultsReader, FromReadJsonSolutionsReader};
@ -104,9 +106,9 @@ impl QueryResultsParser {
}), }),
}, },
QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()), QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()),
QueryResultsFormat::Tsv => match TsvQueryResultsReader::read(reader)? { QueryResultsFormat::Tsv => match FromReadTsvQueryResultsReader::read(reader)? {
TsvQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r), FromReadTsvQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r),
TsvQueryResultsReader::Solutions { FromReadTsvQueryResultsReader::Solutions {
solutions, solutions,
variables, variables,
} => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader { } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
@ -180,7 +182,16 @@ impl QueryResultsParser {
}), }),
}, },
QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()), QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()),
QueryResultsFormat::Tsv => return Err(QueryResultsSyntaxError::msg("The TSV query results parser does not support Tokio AsyncRead yet").into()), QueryResultsFormat::Tsv => match FromTokioAsyncReadTsvQueryResultsReader::read(reader).await? {
FromTokioAsyncReadTsvQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r),
FromTokioAsyncReadTsvQueryResultsReader::Solutions {
solutions,
variables,
} => FromTokioAsyncReadQueryResultsReader::Solutions(FromTokioAsyncReadSolutionsReader {
variables: variables.into(),
solutions: FromTokioAsyncReadSolutionsReaderKind::Tsv(solutions),
}),
},
}) })
} }
} }
@ -261,7 +272,7 @@ pub struct FromReadSolutionsReader<R: Read> {
enum FromReadSolutionsReaderKind<R: Read> { enum FromReadSolutionsReaderKind<R: Read> {
Xml(FromReadXmlSolutionsReader<R>), Xml(FromReadXmlSolutionsReader<R>),
Json(FromReadJsonSolutionsReader<R>), Json(FromReadJsonSolutionsReader<R>),
Tsv(TsvSolutionsReader<R>), Tsv(FromReadTsvSolutionsReader<R>),
} }
impl<R: Read> FromReadSolutionsReader<R> { impl<R: Read> FromReadSolutionsReader<R> {
@ -313,7 +324,7 @@ impl<R: Read> Iterator for FromReadSolutionsReader<R> {
/// It is either a read boolean ([`bool`]) or a streaming reader of a set of solutions ([`FromReadSolutionsReader`]). /// It is either a read boolean ([`bool`]) or a streaming reader of a set of solutions ([`FromReadSolutionsReader`]).
/// ///
/// Example in TSV (the API is the same for JSON and XML): /// Example in TSV (the API is the same for JSON and XML):
/// ```no_run /// ```
/// use oxrdf::{Literal, Variable}; /// use oxrdf::{Literal, Variable};
/// use sparesults::{ /// use sparesults::{
/// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser, /// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser,
@ -393,6 +404,7 @@ pub struct FromTokioAsyncReadSolutionsReader<R: AsyncRead + Unpin> {
enum FromTokioAsyncReadSolutionsReaderKind<R: AsyncRead + Unpin> { enum FromTokioAsyncReadSolutionsReaderKind<R: AsyncRead + Unpin> {
Json(FromTokioAsyncReadJsonSolutionsReader<R>), Json(FromTokioAsyncReadJsonSolutionsReader<R>),
Xml(FromTokioAsyncReadXmlSolutionsReader<R>), Xml(FromTokioAsyncReadXmlSolutionsReader<R>),
Tsv(FromTokioAsyncReadTsvSolutionsReader<R>),
} }
#[cfg(feature = "async-tokio")] #[cfg(feature = "async-tokio")]
@ -400,7 +412,7 @@ impl<R: AsyncRead + Unpin> FromTokioAsyncReadSolutionsReader<R> {
/// Ordered list of the declared variables at the beginning of the results. /// Ordered list of the declared variables at the beginning of the results.
/// ///
/// Example in TSV (the API is the same for JSON and XML): /// Example in TSV (the API is the same for JSON and XML):
/// ```no_run /// ```
/// use oxrdf::Variable; /// use oxrdf::Variable;
/// use sparesults::{ /// use sparesults::{
/// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser, /// FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser,
@ -435,6 +447,7 @@ impl<R: AsyncRead + Unpin> FromTokioAsyncReadSolutionsReader<R> {
match &mut self.solutions { match &mut self.solutions {
FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await, FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await,
FromTokioAsyncReadSolutionsReaderKind::Xml(reader) => reader.read_next().await, FromTokioAsyncReadSolutionsReaderKind::Xml(reader) => reader.read_next().await,
FromTokioAsyncReadSolutionsReaderKind::Tsv(reader) => reader.read_next().await,
} }
.transpose()? .transpose()?
.map(|values| (Arc::clone(&self.variables), values).into()), .map(|values| (Arc::clone(&self.variables), values).into()),

Loading…
Cancel
Save