@ -1,10 +1,10 @@
//! Implementation of [SPARQL 1.1 Query Results CSV and TSV Formats](https://www.w3.org/TR/sparql11-results-csv-tsv/)
use crate ::error ::{ ParseError , SyntaxError , SyntaxErrorKind } ;
use crate ::error ::{ ParseError , SyntaxError , SyntaxErrorKind , TextPosition } ;
use memchr ::memchr ;
use oxrdf ::Variable ;
use oxrdf ::{ vocab ::xsd , * } ;
use std ::io ::{ self , BufRead , Read , Write } ;
use std ::io ::{ self , Read , Write } ;
use std ::str ::{ self , FromStr } ;
const MAX_BUFFER_SIZE : usize = 4096 * 4096 ;
@ -283,12 +283,13 @@ pub enum TsvQueryResultsReader<R: Read> {
}
impl < R : Read > TsvQueryResultsReader < R > {
pub fn read ( read : R ) -> Result < Self , ParseError > {
let mut reader = LineReader ::new ( read ) ;
pub fn read ( mut read : R ) -> Result < Self , ParseError > {
let mut reader = LineReader ::new ( ) ;
let mut buffer = Vec ::new ( ) ;
// We read the header
let line = reader
. next_line ( ) ?
. next_line ( & mut buffer , & mut read ) ?
. trim_matches ( | c | matches! ( c , ' ' | '\r' | '\n' ) ) ;
if line . eq_ignore_ascii_case ( "true" ) {
return Ok ( Self ::Boolean ( true ) ) ;
@ -318,34 +319,65 @@ impl<R: Read> TsvQueryResultsReader<R> {
let column_len = variables . len ( ) ;
Ok ( Self ::Solutions {
variables ,
solutions : TsvSolutionsReader { reader , column_len } ,
solutions : TsvSolutionsReader {
read ,
buffer ,
reader ,
column_len ,
} ,
} )
}
}
pub struct TsvSolutionsReader < R : Read > {
reader : LineReader < R > ,
read : R ,
buffer : Vec < u8 > ,
reader : LineReader ,
column_len : usize ,
}
impl < R : BufRead > TsvSolutionsReader < R > {
impl < R : Read > TsvSolutionsReader < R > {
#[ allow(clippy::unwrap_in_result) ]
pub fn read_next ( & mut self ) -> Result < Option < Vec < Option < Term > > > , ParseError > {
let line = self . reader . next_line ( ) ? ;
let line = self . reader . next_line ( & mut self . buffer , & mut self . read ) ? ;
if line . is_empty ( ) {
return Ok ( None ) ; // EOF
}
let elements = line
. split ( '\t' )
. map ( | v | {
. enumerate ( )
. map ( | ( i , v ) | {
let v = v . trim ( ) ;
if v . is_empty ( ) {
Ok ( None )
} else {
Ok ( Some ( Term ::from_str ( v ) . map_err ( | e | SyntaxError {
inner : SyntaxErrorKind ::Term {
error : e ,
term : v . into ( ) ,
} ,
Ok ( Some ( Term ::from_str ( v ) . map_err ( | e | {
let start_position_char = line
. split ( '\t' )
. take ( i )
. map ( | c | c . chars ( ) . count ( ) + 1 )
. sum ::< usize > ( ) ;
let start_position_bytes =
line . split ( '\t' ) . take ( i ) . map ( | c | c . len ( ) + 1 ) . sum ::< usize > ( ) ;
SyntaxError {
inner : SyntaxErrorKind ::Term {
error : e ,
term : v . into ( ) ,
location : TextPosition {
line : self . reader . line_count - 1 ,
column : start_position_char . try_into ( ) . unwrap ( ) ,
offset : self . reader . last_line_start
+ u64 ::try_from ( start_position_bytes ) . unwrap ( ) ,
} .. TextPosition {
line : self . reader . line_count - 1 ,
column : ( start_position_char + v . chars ( ) . count ( ) )
. try_into ( )
. unwrap ( ) ,
offset : self . reader . last_line_start
+ u64 ::try_from ( start_position_bytes + v . len ( ) ) . unwrap ( ) ,
} ,
} ,
}
} ) ? ) )
}
} )
@ -355,64 +387,88 @@ impl<R: BufRead> TsvSolutionsReader<R> {
} else if self . column_len = = 0 & & elements = = [ None ] {
Ok ( Some ( Vec ::new ( ) ) ) // Zero columns case
} else {
Err ( SyntaxError ::msg ( format! (
"This TSV files has {} columns but we found a row with {} columns: {}" ,
self . column_len ,
elements . len ( ) ,
line
) )
Err ( SyntaxError ::located_message (
format! (
"This TSV files has {} columns but we found a row on line {} with {} columns: {}" ,
self . column_len ,
self . reader . line_count - 1 ,
elements . len ( ) ,
line
) ,
TextPosition {
line : self . reader . line_count - 1 ,
column : 0 ,
offset : self . reader . last_line_start ,
} .. TextPosition {
line : self . reader . line_count - 1 ,
column : line . chars ( ) . count ( ) . try_into ( ) . unwrap ( ) ,
offset : self . reader . last_line_end ,
} ,
)
. into ( ) )
}
}
}
struct LineReader < R : Read > {
read : R ,
buffer : Vec < u8 > ,
start : usize ,
end : usize ,
struct LineReader {
buffer_start : usize ,
buffer_end : usize ,
line_count : u64 ,
last_line_start : u64 ,
last_line_end : u64 ,
}
impl < R : Read > LineReader < R > {
fn new ( read : R ) -> Self {
impl LineReader {
fn new ( ) -> Self {
Self {
read ,
buffer : Vec ::new ( ) ,
start : 0 ,
end : 0 ,
buffer_start : 0 ,
buffer_end : 0 ,
line_count : 0 ,
last_line_start : 0 ,
last_line_end : 0 ,
}
}
fn next_line ( & mut self ) -> io ::Result < & str > {
self . buffer . copy_within ( self . start .. self . end , 0 ) ;
self . end - = self . start ;
self . start = 0 ;
#[ allow(clippy::unwrap_in_result) ]
fn next_line < ' a > (
& mut self ,
buffer : & ' a mut Vec < u8 > ,
read : & mut impl Read ,
) -> io ::Result < & ' a str > {
let line_end = loop {
if let Some ( eol ) = memchr ( b'\n' , & self . buffer [ self . start .. self . end ] ) {
break self . start + eol + 1 ;
if let Some ( eol ) = memchr ( b'\n' , & buffer [ self . buffer_start .. self . buffer_end ] ) {
break self . buffer_start + eol + 1 ;
}
if self . buffer_start > buffer . len ( ) / 2 {
buffer . copy_within ( self . buffer_start .. self . buffer_end , 0 ) ;
self . buffer_end - = self . buffer_start ;
self . buffer_start = 0 ;
}
if self . end + 1024 > self . buffer . len ( ) {
if self . end + 1024 > MAX_BUFFER_SIZE {
if self . buffer_ end + 1024 > buffer . len ( ) {
if self . buffer_ end + 1024 > MAX_BUFFER_SIZE {
return Err ( io ::Error ::new (
io ::ErrorKind ::OutOfMemory ,
format! ( "Reached the buffer maximal size of {MAX_BUFFER_SIZE}" ) ,
) ) ;
}
self . buffer . resize ( self . end + 1024 , b'\0' ) ;
buffer . resize ( self . buffer_ end + 1024 , b'\0' ) ;
}
let read = self . read . read ( & mut self . buffer [ self . end .. ] ) ? ;
let read = read . read ( & mut buffer [ self . buffer_ end.. ] ) ? ;
if read = = 0 {
break self . end ;
break self . buffer_ end;
}
self . end + = read ;
self . buffer_ end + = read ;
} ;
let result = str ::from_utf8 ( & self . buffer [ self . start .. line_end ] ) . map_err ( | e | {
let result = str ::from_utf8 ( & buffer [ self . buffer_ start.. line_end ] ) . map_err ( | e | {
io ::Error ::new (
io ::ErrorKind ::InvalidData ,
format! ( "Invalid UTF-8 in the TSV file: {e}" ) ,
)
} ) ;
self . start = line_end ;
self . line_count + = 1 ;
self . last_line_start = self . last_line_end ;
self . last_line_end + = u64 ::try_from ( line_end - self . buffer_start ) . unwrap ( ) ;
self . buffer_start = line_end ;
result
}
}