Parsers do not need BufRead anymore

pull/580/head
Tpt 1 year ago committed by Thomas Tanon
parent 077c1fc1a8
commit 922023b1da
  1. 8
      lib/benches/store.rs
  2. 22
      lib/src/io/read.rs
  3. 3
      lib/src/sparql/update.rs
  4. 14
      lib/src/store.rs
  5. 30
      python/src/io.rs
  6. 25
      server/src/main.rs
  7. 8
      testsuite/src/files.rs
  8. 7
      testsuite/src/sparql_evaluator.rs

@ -92,7 +92,7 @@ fn store_query_and_update(c: &mut Criterion) {
.read_to_end(&mut data) .read_to_end(&mut data)
.unwrap(); .unwrap();
let operations = read_data("mix-exploreAndUpdate-1000.tsv.zst") let operations = BufReader::new(read_data("mix-exploreAndUpdate-1000.tsv.zst"))
.lines() .lines()
.map(|l| { .map(|l| {
let l = l.unwrap(); let l = l.unwrap();
@ -167,7 +167,7 @@ fn sparql_parsing(c: &mut Criterion) {
.read_to_end(&mut data) .read_to_end(&mut data)
.unwrap(); .unwrap();
let operations = read_data("mix-exploreAndUpdate-1000.tsv.zst") let operations = BufReader::new(read_data("mix-exploreAndUpdate-1000.tsv.zst"))
.lines() .lines()
.map(|l| { .map(|l| {
let l = l.unwrap(); let l = l.unwrap();
@ -213,7 +213,7 @@ criterion_group!(store, sparql_parsing, store_query_and_update, store_load);
criterion_main!(store); criterion_main!(store);
fn read_data(file: &str) -> impl BufRead { fn read_data(file: &str) -> impl Read {
if !Path::new(file).exists() { if !Path::new(file).exists() {
let mut client = oxhttp::Client::new(); let mut client = oxhttp::Client::new();
client.set_redirection_limit(5); client.set_redirection_limit(5);
@ -228,7 +228,7 @@ fn read_data(file: &str) -> impl BufRead {
); );
std::io::copy(&mut response.into_body(), &mut File::create(file).unwrap()).unwrap(); std::io::copy(&mut response.into_body(), &mut File::create(file).unwrap()).unwrap();
} }
BufReader::new(zstd::Decoder::new(File::open(file).unwrap()).unwrap()) zstd::Decoder::new(File::open(file).unwrap()).unwrap()
} }
#[derive(Clone)] #[derive(Clone)]

@ -10,7 +10,7 @@ use oxttl::ntriples::{FromReadNTriplesReader, NTriplesParser};
use oxttl::trig::{FromReadTriGReader, TriGParser}; use oxttl::trig::{FromReadTriGReader, TriGParser};
use oxttl::turtle::{FromReadTurtleReader, TurtleParser}; use oxttl::turtle::{FromReadTurtleReader, TurtleParser};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::BufRead; use std::io::Read;
/// Parsers for RDF graph serialization formats. /// Parsers for RDF graph serialization formats.
/// ///
@ -83,8 +83,8 @@ impl GraphParser {
}) })
} }
/// Executes the parsing itself on a [`BufRead`] implementation and returns an iterator of triples. /// Executes the parsing itself on a [`Read`] implementation and returns an iterator of triples.
pub fn read_triples<R: BufRead>(&self, reader: R) -> TripleReader<R> { pub fn read_triples<R: Read>(&self, reader: R) -> TripleReader<R> {
TripleReader { TripleReader {
mapper: BlankNodeMapper::default(), mapper: BlankNodeMapper::default(),
parser: match &self.inner { parser: match &self.inner {
@ -114,19 +114,19 @@ impl GraphParser {
/// # std::io::Result::Ok(()) /// # std::io::Result::Ok(())
/// ``` /// ```
#[must_use] #[must_use]
pub struct TripleReader<R: BufRead> { pub struct TripleReader<R: Read> {
mapper: BlankNodeMapper, mapper: BlankNodeMapper,
parser: TripleReaderKind<R>, parser: TripleReaderKind<R>,
} }
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum TripleReaderKind<R: BufRead> { enum TripleReaderKind<R: Read> {
NTriples(FromReadNTriplesReader<R>), NTriples(FromReadNTriplesReader<R>),
Turtle(FromReadTurtleReader<R>), Turtle(FromReadTurtleReader<R>),
RdfXml(FromReadRdfXmlReader<R>), RdfXml(FromReadRdfXmlReader<R>),
} }
impl<R: BufRead> Iterator for TripleReader<R> { impl<R: Read> Iterator for TripleReader<R> {
type Item = Result<Triple, ParseError>; type Item = Result<Triple, ParseError>;
fn next(&mut self) -> Option<Result<Triple, ParseError>> { fn next(&mut self) -> Option<Result<Triple, ParseError>> {
@ -214,8 +214,8 @@ impl DatasetParser {
}) })
} }
/// Executes the parsing itself on a [`BufRead`] implementation and returns an iterator of quads. /// Executes the parsing itself on a [`Read`] implementation and returns an iterator of quads.
pub fn read_quads<R: BufRead>(&self, reader: R) -> QuadReader<R> { pub fn read_quads<R: Read>(&self, reader: R) -> QuadReader<R> {
QuadReader { QuadReader {
mapper: BlankNodeMapper::default(), mapper: BlankNodeMapper::default(),
parser: match &self.inner { parser: match &self.inner {
@ -242,17 +242,17 @@ impl DatasetParser {
/// # std::io::Result::Ok(()) /// # std::io::Result::Ok(())
/// ``` /// ```
#[must_use] #[must_use]
pub struct QuadReader<R: BufRead> { pub struct QuadReader<R: Read> {
mapper: BlankNodeMapper, mapper: BlankNodeMapper,
parser: QuadReaderKind<R>, parser: QuadReaderKind<R>,
} }
enum QuadReaderKind<R: BufRead> { enum QuadReaderKind<R: Read> {
NQuads(FromReadNQuadsReader<R>), NQuads(FromReadNQuadsReader<R>),
TriG(FromReadTriGReader<R>), TriG(FromReadTriGReader<R>),
} }
impl<R: BufRead> Iterator for QuadReader<R> { impl<R: Read> Iterator for QuadReader<R> {
type Item = Result<Quad, ParseError>; type Item = Result<Quad, ParseError>;
fn next(&mut self) -> Option<Result<Quad, ParseError>> { fn next(&mut self) -> Option<Result<Quad, ParseError>> {

@ -18,7 +18,6 @@ use spargebra::term::{
use spargebra::GraphUpdateOperation; use spargebra::GraphUpdateOperation;
use sparopt::Optimizer; use sparopt::Optimizer;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::BufReader;
use std::rc::Rc; use std::rc::Rc;
pub fn evaluate_update<'a, 'b: 'a>( pub fn evaluate_update<'a, 'b: 'a>(
@ -182,7 +181,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> {
.with_base_iri(base_iri.as_str()) .with_base_iri(base_iri.as_str())
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
} }
for t in parser.read_triples(BufReader::new(body)) { for t in parser.read_triples(body) {
self.transaction self.transaction
.insert(t?.as_ref().in_graph(to_graph_name))?; .insert(t?.as_ref().in_graph(to_graph_name))?;
} }

@ -40,7 +40,7 @@ use crate::storage::{
}; };
pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError}; pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError};
use std::error::Error; use std::error::Error;
use std::io::{BufRead, Write}; use std::io::{Read, Write};
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
use std::path::Path; use std::path::Path;
use std::{fmt, str}; use std::{fmt, str};
@ -469,7 +469,7 @@ impl Store {
/// ``` /// ```
pub fn load_graph<'a>( pub fn load_graph<'a>(
&self, &self,
reader: impl BufRead, reader: impl Read,
format: GraphFormat, format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>, to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>, base_iri: Option<&str>,
@ -513,7 +513,7 @@ impl Store {
/// ``` /// ```
pub fn load_dataset( pub fn load_dataset(
&self, &self,
reader: impl BufRead, reader: impl Read,
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), LoaderError> { ) -> Result<(), LoaderError> {
@ -1077,7 +1077,7 @@ impl<'a> Transaction<'a> {
/// ``` /// ```
pub fn load_graph<'b>( pub fn load_graph<'b>(
&mut self, &mut self,
reader: impl BufRead, reader: impl Read,
format: GraphFormat, format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'b>>, to_graph_name: impl Into<GraphNameRef<'b>>,
base_iri: Option<&str>, base_iri: Option<&str>,
@ -1119,7 +1119,7 @@ impl<'a> Transaction<'a> {
/// ``` /// ```
pub fn load_dataset( pub fn load_dataset(
&mut self, &mut self,
reader: impl BufRead, reader: impl Read,
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), LoaderError> { ) -> Result<(), LoaderError> {
@ -1462,7 +1462,7 @@ impl BulkLoader {
/// ``` /// ```
pub fn load_dataset( pub fn load_dataset(
&self, &self,
reader: impl BufRead, reader: impl Read,
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), LoaderError> { ) -> Result<(), LoaderError> {
@ -1517,7 +1517,7 @@ impl BulkLoader {
/// ``` /// ```
pub fn load_graph<'a>( pub fn load_graph<'a>(
&self, &self,
reader: impl BufRead, reader: impl Read,
format: GraphFormat, format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>, to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>, base_iri: Option<&str>,

@ -12,7 +12,7 @@ use pyo3::{intern, wrap_pyfunction};
use std::cmp::max; use std::cmp::max;
use std::error::Error; use std::error::Error;
use std::fs::File; use std::fs::File;
use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Write}; use std::io::{self, BufWriter, Cursor, Read, Write};
pub fn add_to_module(module: &PyModule) -> PyResult<()> { pub fn add_to_module(module: &PyModule) -> PyResult<()> {
module.add_wrapped(wrap_pyfunction!(parse))?; module.add_wrapped(wrap_pyfunction!(parse))?;
@ -193,15 +193,13 @@ impl PyQuadReader {
pub enum PyReadable { pub enum PyReadable {
Bytes(Cursor<Vec<u8>>), Bytes(Cursor<Vec<u8>>),
Io(BufReader<PyIo>), Io(PyIo),
File(BufReader<File>), File(File),
} }
impl PyReadable { impl PyReadable {
pub fn from_file(file: &str, py: Python<'_>) -> io::Result<Self> { pub fn from_file(file: &str, py: Python<'_>) -> io::Result<Self> {
Ok(Self::File(BufReader::new( Ok(Self::File(py.allow_threads(|| File::open(file))?))
py.allow_threads(|| File::open(file))?,
)))
} }
pub fn from_data(data: PyObject, py: Python<'_>) -> Self { pub fn from_data(data: PyObject, py: Python<'_>) -> Self {
@ -210,7 +208,7 @@ impl PyReadable {
} else if let Ok(string) = data.extract::<String>(py) { } else if let Ok(string) = data.extract::<String>(py) {
Self::Bytes(Cursor::new(string.into_bytes())) Self::Bytes(Cursor::new(string.into_bytes()))
} else { } else {
Self::Io(BufReader::new(PyIo(data))) Self::Io(PyIo(data))
} }
} }
} }
@ -225,24 +223,6 @@ impl Read for PyReadable {
} }
} }
impl BufRead for PyReadable {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
match self {
Self::Bytes(bytes) => bytes.fill_buf(),
Self::Io(io) => io.fill_buf(),
Self::File(file) => file.fill_buf(),
}
}
fn consume(&mut self, amt: usize) {
match self {
Self::Bytes(bytes) => bytes.consume(amt),
Self::Io(io) => io.consume(amt),
Self::File(file) => file.consume(amt),
}
}
}
pub enum PyWritable { pub enum PyWritable {
Io(BufWriter<PyIo>), Io(BufWriter<PyIo>),
File(BufWriter<File>), File(BufWriter<File>),

@ -21,7 +21,7 @@ use std::cmp::{max, min};
use std::env; use std::env;
use std::ffi::OsStr; use std::ffi::OsStr;
use std::fs::File; use std::fs::File;
use std::io::{self, stdin, stdout, BufRead, BufReader, BufWriter, Read, Write}; use std::io::{self, stdin, stdout, BufWriter, Read, Write};
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
use std::os::unix::net::UnixDatagram; use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -377,7 +377,7 @@ pub fn main() -> anyhow::Result<()> {
if file.extension().map_or(false, |e| e == OsStr::new("gz")) { if file.extension().map_or(false, |e| e == OsStr::new("gz")) {
bulk_load( bulk_load(
&loader, &loader,
BufReader::new(MultiGzDecoder::new(fp)), MultiGzDecoder::new(fp),
format.unwrap_or_else(|| { format.unwrap_or_else(|| {
GraphOrDatasetFormat::from_path( GraphOrDatasetFormat::from_path(
&file.with_extension(""), &file.with_extension(""),
@ -390,7 +390,7 @@ pub fn main() -> anyhow::Result<()> {
} else { } else {
bulk_load( bulk_load(
&loader, &loader,
BufReader::new(fp), fp,
format.unwrap_or_else(|| { format.unwrap_or_else(|| {
GraphOrDatasetFormat::from_path(&file).unwrap() GraphOrDatasetFormat::from_path(&file).unwrap()
}), }),
@ -645,7 +645,7 @@ pub fn main() -> anyhow::Result<()> {
fn bulk_load( fn bulk_load(
loader: &BulkLoader, loader: &BulkLoader,
reader: impl BufRead, reader: impl Read,
format: GraphOrDatasetFormat, format: GraphOrDatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
to_graph_name: Option<NamedNodeRef<'_>>, to_graph_name: Option<NamedNodeRef<'_>>,
@ -1531,18 +1531,13 @@ fn web_load_graph(
}; };
if url_query_parameter(request, "no_transaction").is_some() { if url_query_parameter(request, "no_transaction").is_some() {
web_bulk_loader(store, request).load_graph( web_bulk_loader(store, request).load_graph(
BufReader::new(request.body_mut()), request.body_mut(),
format, format,
to_graph_name, to_graph_name,
base_iri, base_iri,
) )
} else { } else {
store.load_graph( store.load_graph(request.body_mut(), format, to_graph_name, base_iri)
BufReader::new(request.body_mut()),
format,
to_graph_name,
base_iri,
)
} }
.map_err(loader_to_http_error) .map_err(loader_to_http_error)
} }
@ -1553,13 +1548,9 @@ fn web_load_dataset(
format: DatasetFormat, format: DatasetFormat,
) -> Result<(), HttpError> { ) -> Result<(), HttpError> {
if url_query_parameter(request, "no_transaction").is_some() { if url_query_parameter(request, "no_transaction").is_some() {
web_bulk_loader(store, request).load_dataset( web_bulk_loader(store, request).load_dataset(request.body_mut(), format, None)
BufReader::new(request.body_mut()),
format,
None,
)
} else { } else {
store.load_dataset(BufReader::new(request.body_mut()), format, None) store.load_dataset(request.body_mut(), format, None)
} }
.map_err(loader_to_http_error) .map_err(loader_to_http_error)
} }

@ -1,13 +1,13 @@
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Context, Result};
use oxigraph::io::{DatasetFormat, DatasetParser, GraphFormat, GraphParser}; use oxigraph::io::{DatasetFormat, DatasetParser, GraphFormat, GraphParser};
use oxigraph::model::{Dataset, Graph}; use oxigraph::model::{Dataset, Graph};
use oxttl::n3::N3Quad; use oxttl::n3::N3Quad;
use oxttl::N3Parser; use oxttl::N3Parser;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader, Read}; use std::io::Read;
use std::path::PathBuf; use std::path::PathBuf;
pub fn read_file(url: &str) -> Result<impl BufRead> { pub fn read_file(url: &str) -> Result<impl Read> {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push(if url.starts_with("http://w3c.github.io/") { path.push(if url.starts_with("http://w3c.github.io/") {
url.replace("http://w3c.github.io/", "") url.replace("http://w3c.github.io/", "")
@ -25,7 +25,7 @@ pub fn read_file(url: &str) -> Result<impl BufRead> {
} else { } else {
bail!("Not supported url for file: {url}") bail!("Not supported url for file: {url}")
}); });
Ok(BufReader::new(File::open(&path)?)) File::open(&path).with_context(|| format!("Failed to read {}", path.display()))
} }
pub fn read_file_to_string(url: &str) -> Result<String> { pub fn read_file_to_string(url: &str) -> Result<String> {

@ -11,7 +11,7 @@ use oxigraph::store::Store;
use sparopt::Optimizer; use sparopt::Optimizer;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write; use std::fmt::Write;
use std::io::{self, Cursor}; use std::io::{self, BufReader, Cursor};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
@ -296,7 +296,10 @@ fn load_sparql_query_result(url: &str) -> Result<StaticQueryResults> {
.rsplit_once('.') .rsplit_once('.')
.and_then(|(_, extension)| QueryResultsFormat::from_extension(extension)) .and_then(|(_, extension)| QueryResultsFormat::from_extension(extension))
{ {
StaticQueryResults::from_query_results(QueryResults::read(read_file(url)?, format)?, false) StaticQueryResults::from_query_results(
QueryResults::read(BufReader::new(read_file(url)?), format)?,
false,
)
} else { } else {
StaticQueryResults::from_graph(&load_graph(url, guess_graph_format(url)?, false)?) StaticQueryResults::from_graph(&load_graph(url, guess_graph_format(url)?, false)?)
} }

Loading…
Cancel
Save