From 922023b1da2dd47091c896d4098a2f049adf9b86 Mon Sep 17 00:00:00 2001 From: Tpt Date: Fri, 21 Jul 2023 18:52:42 +0200 Subject: [PATCH] Parsers do not need BufRead anymore --- lib/benches/store.rs | 8 ++++---- lib/src/io/read.rs | 22 +++++++++++----------- lib/src/sparql/update.rs | 3 +-- lib/src/store.rs | 14 +++++++------- python/src/io.rs | 30 +++++------------------------- server/src/main.rs | 25 ++++++++----------------- testsuite/src/files.rs | 8 ++++---- testsuite/src/sparql_evaluator.rs | 7 +++++-- 8 files changed, 45 insertions(+), 72 deletions(-) diff --git a/lib/benches/store.rs b/lib/benches/store.rs index 0f35a623..17779b19 100644 --- a/lib/benches/store.rs +++ b/lib/benches/store.rs @@ -92,7 +92,7 @@ fn store_query_and_update(c: &mut Criterion) { .read_to_end(&mut data) .unwrap(); - let operations = read_data("mix-exploreAndUpdate-1000.tsv.zst") + let operations = BufReader::new(read_data("mix-exploreAndUpdate-1000.tsv.zst")) .lines() .map(|l| { let l = l.unwrap(); @@ -167,7 +167,7 @@ fn sparql_parsing(c: &mut Criterion) { .read_to_end(&mut data) .unwrap(); - let operations = read_data("mix-exploreAndUpdate-1000.tsv.zst") + let operations = BufReader::new(read_data("mix-exploreAndUpdate-1000.tsv.zst")) .lines() .map(|l| { let l = l.unwrap(); @@ -213,7 +213,7 @@ criterion_group!(store, sparql_parsing, store_query_and_update, store_load); criterion_main!(store); -fn read_data(file: &str) -> impl BufRead { +fn read_data(file: &str) -> impl Read { if !Path::new(file).exists() { let mut client = oxhttp::Client::new(); client.set_redirection_limit(5); @@ -228,7 +228,7 @@ fn read_data(file: &str) -> impl BufRead { ); std::io::copy(&mut response.into_body(), &mut File::create(file).unwrap()).unwrap(); } - BufReader::new(zstd::Decoder::new(File::open(file).unwrap()).unwrap()) + zstd::Decoder::new(File::open(file).unwrap()).unwrap() } #[derive(Clone)] diff --git a/lib/src/io/read.rs b/lib/src/io/read.rs index 9489a168..3b3cadc4 100644 --- a/lib/src/io/read.rs +++ b/lib/src/io/read.rs @@ -10,7 +10,7 @@ use oxttl::ntriples::{FromReadNTriplesReader, NTriplesParser}; use oxttl::trig::{FromReadTriGReader, TriGParser}; use oxttl::turtle::{FromReadTurtleReader, TurtleParser}; use std::collections::HashMap; -use std::io::BufRead; +use std::io::Read; /// Parsers for RDF graph serialization formats. /// @@ -83,8 +83,8 @@ impl GraphParser { }) } - /// Executes the parsing itself on a [`BufRead`] implementation and returns an iterator of triples. - pub fn read_triples(&self, reader: R) -> TripleReader { + /// Executes the parsing itself on a [`Read`] implementation and returns an iterator of triples. + pub fn read_triples(&self, reader: R) -> TripleReader { TripleReader { mapper: BlankNodeMapper::default(), parser: match &self.inner { @@ -114,19 +114,19 @@ impl GraphParser { /// # std::io::Result::Ok(()) /// ``` #[must_use] -pub struct TripleReader { +pub struct TripleReader { mapper: BlankNodeMapper, parser: TripleReaderKind, } #[allow(clippy::large_enum_variant)] -enum TripleReaderKind { +enum TripleReaderKind { NTriples(FromReadNTriplesReader), Turtle(FromReadTurtleReader), RdfXml(FromReadRdfXmlReader), } -impl Iterator for TripleReader { +impl Iterator for TripleReader { type Item = Result; fn next(&mut self) -> Option> { @@ -214,8 +214,8 @@ impl DatasetParser { }) } - /// Executes the parsing itself on a [`BufRead`] implementation and returns an iterator of quads. - pub fn read_quads(&self, reader: R) -> QuadReader { + /// Executes the parsing itself on a [`Read`] implementation and returns an iterator of quads. + pub fn read_quads(&self, reader: R) -> QuadReader { QuadReader { mapper: BlankNodeMapper::default(), parser: match &self.inner { @@ -242,17 +242,17 @@ impl DatasetParser { /// # std::io::Result::Ok(()) /// ``` #[must_use] -pub struct QuadReader { +pub struct QuadReader { mapper: BlankNodeMapper, parser: QuadReaderKind, } -enum QuadReaderKind { +enum QuadReaderKind { NQuads(FromReadNQuadsReader), TriG(FromReadTriGReader), } -impl Iterator for QuadReader { +impl Iterator for QuadReader { type Item = Result; fn next(&mut self) -> Option> { diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index fb68c561..a62f99a8 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -18,7 +18,6 @@ use spargebra::term::{ use spargebra::GraphUpdateOperation; use sparopt::Optimizer; use std::collections::HashMap; -use std::io::BufReader; use std::rc::Rc; pub fn evaluate_update<'a, 'b: 'a>( @@ -182,7 +181,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> { .with_base_iri(base_iri.as_str()) .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; } - for t in parser.read_triples(BufReader::new(body)) { + for t in parser.read_triples(body) { self.transaction .insert(t?.as_ref().in_graph(to_graph_name))?; } diff --git a/lib/src/store.rs b/lib/src/store.rs index c174bcb1..eed5dce2 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -40,7 +40,7 @@ use crate::storage::{ }; pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError}; use std::error::Error; -use std::io::{BufRead, Write}; +use std::io::{Read, Write}; #[cfg(not(target_family = "wasm"))] use std::path::Path; use std::{fmt, str}; @@ -469,7 +469,7 @@ impl Store { /// ``` pub fn load_graph<'a>( &self, - reader: impl BufRead, + reader: impl Read, format: GraphFormat, to_graph_name: impl Into>, base_iri: Option<&str>, @@ -513,7 +513,7 @@ impl Store { /// ``` pub fn load_dataset( &self, - reader: impl BufRead, + reader: impl Read, format: DatasetFormat, base_iri: Option<&str>, ) -> Result<(), LoaderError> { @@ -1077,7 +1077,7 @@ impl<'a> Transaction<'a> { /// ``` pub fn load_graph<'b>( &mut self, - reader: impl BufRead, + reader: impl Read, format: GraphFormat, to_graph_name: impl Into>, base_iri: Option<&str>, @@ -1119,7 +1119,7 @@ impl<'a> Transaction<'a> { /// ``` pub fn load_dataset( &mut self, - reader: impl BufRead, + reader: impl Read, format: DatasetFormat, base_iri: Option<&str>, ) -> Result<(), LoaderError> { @@ -1462,7 +1462,7 @@ impl BulkLoader { /// ``` pub fn load_dataset( &self, - reader: impl BufRead, + reader: impl Read, format: DatasetFormat, base_iri: Option<&str>, ) -> Result<(), LoaderError> { @@ -1517,7 +1517,7 @@ impl BulkLoader { /// ``` pub fn load_graph<'a>( &self, - reader: impl BufRead, + reader: impl Read, format: GraphFormat, to_graph_name: impl Into>, base_iri: Option<&str>, diff --git a/python/src/io.rs b/python/src/io.rs index 382a8c79..d29bfb9d 100644 --- a/python/src/io.rs +++ b/python/src/io.rs @@ -12,7 +12,7 @@ use pyo3::{intern, wrap_pyfunction}; use std::cmp::max; use std::error::Error; use std::fs::File; -use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Write}; +use std::io::{self, BufWriter, Cursor, Read, Write}; pub fn add_to_module(module: &PyModule) -> PyResult<()> { module.add_wrapped(wrap_pyfunction!(parse))?; @@ -193,15 +193,13 @@ impl PyQuadReader { pub enum PyReadable { Bytes(Cursor>), - Io(BufReader), - File(BufReader), + Io(PyIo), + File(File), } impl PyReadable { pub fn from_file(file: &str, py: Python<'_>) -> io::Result { - Ok(Self::File(BufReader::new( - py.allow_threads(|| File::open(file))?, - ))) + Ok(Self::File(py.allow_threads(|| File::open(file))?)) } pub fn from_data(data: PyObject, py: Python<'_>) -> Self { @@ -210,7 +208,7 @@ impl PyReadable { } else if let Ok(string) = data.extract::(py) { Self::Bytes(Cursor::new(string.into_bytes())) } else { - Self::Io(BufReader::new(PyIo(data))) + Self::Io(PyIo(data)) } } } @@ -225,24 +223,6 @@ impl Read for PyReadable { } } -impl BufRead for PyReadable { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - match self { - Self::Bytes(bytes) => bytes.fill_buf(), - Self::Io(io) => io.fill_buf(), - Self::File(file) => file.fill_buf(), - } - } - - fn consume(&mut self, amt: usize) { - match self { - Self::Bytes(bytes) => bytes.consume(amt), - Self::Io(io) => io.consume(amt), - Self::File(file) => file.consume(amt), - } - } -} - pub enum PyWritable { Io(BufWriter), File(BufWriter), diff --git a/server/src/main.rs b/server/src/main.rs index 7474cae4..5db006b9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -21,7 +21,7 @@ use std::cmp::{max, min}; use std::env; use std::ffi::OsStr; use std::fs::File; -use std::io::{self, stdin, stdout, BufRead, BufReader, BufWriter, Read, Write}; +use std::io::{self, stdin, stdout, BufWriter, Read, Write}; #[cfg(target_os = "linux")] use std::os::unix::net::UnixDatagram; use std::path::{Path, PathBuf}; @@ -377,7 +377,7 @@ pub fn main() -> anyhow::Result<()> { if file.extension().map_or(false, |e| e == OsStr::new("gz")) { bulk_load( &loader, - BufReader::new(MultiGzDecoder::new(fp)), + MultiGzDecoder::new(fp), format.unwrap_or_else(|| { GraphOrDatasetFormat::from_path( &file.with_extension(""), @@ -390,7 +390,7 @@ pub fn main() -> anyhow::Result<()> { } else { bulk_load( &loader, - BufReader::new(fp), + fp, format.unwrap_or_else(|| { GraphOrDatasetFormat::from_path(&file).unwrap() }), @@ -645,7 +645,7 @@ pub fn main() -> anyhow::Result<()> { fn bulk_load( loader: &BulkLoader, - reader: impl BufRead, + reader: impl Read, format: GraphOrDatasetFormat, base_iri: Option<&str>, to_graph_name: Option>, @@ -1531,18 +1531,13 @@ fn web_load_graph( }; if url_query_parameter(request, "no_transaction").is_some() { web_bulk_loader(store, request).load_graph( - BufReader::new(request.body_mut()), + request.body_mut(), format, to_graph_name, base_iri, ) } else { - store.load_graph( - BufReader::new(request.body_mut()), - format, - to_graph_name, - base_iri, - ) + store.load_graph(request.body_mut(), format, to_graph_name, base_iri) } .map_err(loader_to_http_error) } @@ -1553,13 +1548,9 @@ fn web_load_dataset( format: DatasetFormat, ) -> Result<(), HttpError> { if url_query_parameter(request, "no_transaction").is_some() { - web_bulk_loader(store, request).load_dataset( - BufReader::new(request.body_mut()), - format, - None, - ) + web_bulk_loader(store, request).load_dataset(request.body_mut(), format, None) } else { - store.load_dataset(BufReader::new(request.body_mut()), format, None) + store.load_dataset(request.body_mut(), format, None) } .map_err(loader_to_http_error) } diff --git a/testsuite/src/files.rs b/testsuite/src/files.rs index f7101e9d..2d9914c7 100644 --- a/testsuite/src/files.rs +++ b/testsuite/src/files.rs @@ -1,13 +1,13 @@ -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, Context, Result}; use oxigraph::io::{DatasetFormat, DatasetParser, GraphFormat, GraphParser}; use oxigraph::model::{Dataset, Graph}; use oxttl::n3::N3Quad; use oxttl::N3Parser; use std::fs::File; -use std::io::{BufRead, BufReader, Read}; +use std::io::Read; use std::path::PathBuf; -pub fn read_file(url: &str) -> Result { +pub fn read_file(url: &str) -> Result { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push(if url.starts_with("http://w3c.github.io/") { url.replace("http://w3c.github.io/", "") @@ -25,7 +25,7 @@ pub fn read_file(url: &str) -> Result { } else { bail!("Not supported url for file: {url}") }); - Ok(BufReader::new(File::open(&path)?)) + File::open(&path).with_context(|| format!("Failed to read {}", path.display())) } pub fn read_file_to_string(url: &str) -> Result { diff --git a/testsuite/src/sparql_evaluator.rs b/testsuite/src/sparql_evaluator.rs index da4f0347..ab2b6643 100644 --- a/testsuite/src/sparql_evaluator.rs +++ b/testsuite/src/sparql_evaluator.rs @@ -11,7 +11,7 @@ use oxigraph::store::Store; use sparopt::Optimizer; use std::collections::HashMap; use std::fmt::Write; -use std::io::{self, Cursor}; +use std::io::{self, BufReader, Cursor}; use std::str::FromStr; use std::sync::Arc; @@ -296,7 +296,10 @@ fn load_sparql_query_result(url: &str) -> Result { .rsplit_once('.') .and_then(|(_, extension)| QueryResultsFormat::from_extension(extension)) { - StaticQueryResults::from_query_results(QueryResults::read(read_file(url)?, format)?, false) + StaticQueryResults::from_query_results( + QueryResults::read(BufReader::new(read_file(url)?), format)?, + false, + ) } else { StaticQueryResults::from_graph(&load_graph(url, guess_graph_format(url)?, false)?) }