diff --git a/Cargo.lock b/Cargo.lock index d2568bac..ab40b360 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,6 +194,12 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + [[package]] name = "cast" version = "0.3.0" @@ -1038,6 +1044,7 @@ dependencies = [ "oxiri", "oxrdf", "quick-xml", + "tokio", ] [[package]] @@ -1064,6 +1071,7 @@ dependencies = [ "oxilangtag", "oxiri", "oxrdf", + "tokio", ] [[package]] @@ -1128,6 +1136,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + [[package]] name = "pkg-config" version = "0.3.27" @@ -1293,6 +1307,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81b9228215d82c7b61490fec1de287136b5de6f5700f6e58ea9ad61a7964ca51" dependencies = [ "memchr", + "tokio", ] [[package]] @@ -1785,6 +1800,30 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tokio" +version = "1.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +dependencies = [ + "autocfg", + "bytes", + "pin-project-lite", + "tokio-macros", + "windows-sys 0.48.0", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.18", +] + [[package]] name = "typenum" version = "1.16.0" diff --git a/lib/oxrdfxml/Cargo.toml b/lib/oxrdfxml/Cargo.toml index 3d012e17..895159b5 100644 --- a/lib/oxrdfxml/Cargo.toml +++ b/lib/oxrdfxml/Cargo.toml @@ -13,11 +13,19 @@ Parser for the RDF/XML language edition = "2021" rust-version = "1.65" +[features] +default = [] +async-tokio = ["dep:tokio", "quick-xml/async-tokio"] + [dependencies] oxrdf = { version = "0.2.0-alpha.1-dev", path = "../oxrdf" } oxilangtag = "0.1" oxiri = "0.2" quick-xml = "0.29" +tokio = { version = "1", optional = true, features = ["io-util"] } + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "macros"] } [package.metadata.docs.rs] all-features = true diff --git a/lib/oxrdfxml/src/lib.rs b/lib/oxrdfxml/src/lib.rs index e07e31c8..bcc87308 100644 --- a/lib/oxrdfxml/src/lib.rs +++ b/lib/oxrdfxml/src/lib.rs @@ -9,6 +9,10 @@ mod parser; mod serializer; mod utils; -pub use crate::serializer::{RdfXmlSerializer, ToWriteRdfXmlWriter}; pub use error::{ParseError, SyntaxError}; +#[cfg(feature = "async-tokio")] +pub use parser::FromTokioAsyncReadRdfXmlReader; pub use parser::{FromReadRdfXmlReader, RdfXmlParser}; +#[cfg(feature = "async-tokio")] +pub use serializer::ToTokioAsyncWriteRdfXmlWriter; +pub use serializer::{RdfXmlSerializer, ToWriteRdfXmlWriter}; diff --git a/lib/oxrdfxml/src/parser.rs b/lib/oxrdfxml/src/parser.rs index a726a7d0..999becad 100644 --- a/lib/oxrdfxml/src/parser.rs +++ b/lib/oxrdfxml/src/parser.rs @@ -10,8 +10,10 @@ use quick_xml::events::*; use quick_xml::name::{LocalName, QName, ResolveResult}; use quick_xml::{NsReader, Writer}; use std::collections::{HashMap, HashSet}; -use std::io::{BufRead, BufReader, Read}; +use std::io::{BufReader, Read}; use std::str; +#[cfg(feature = "async-tokio")] +use tokio::io::{AsyncRead, BufReader as AsyncBufReader}; /// A [RDF/XML](https://www.w3.org/TR/rdf-syntax-grammar/) streaming parser. /// @@ -93,23 +95,70 @@ impl RdfXmlParser { /// # Result::<_,Box>::Ok(()) /// ``` pub fn parse_from_read(&self, read: R) -> FromReadRdfXmlReader { - let mut reader = NsReader::from_reader(BufReader::new(read)); - reader.expand_empty_elements(true); FromReadRdfXmlReader { results: Vec::new(), - reader: RdfXmlReader { - reader, - state: vec![RdfXmlState::Doc { - base_iri: self.base.clone(), - }], - custom_entities: HashMap::default(), - in_literal_depth: 0, - known_rdf_id: HashSet::default(), - is_end: false, - }, + reader: self.parse(BufReader::new(read)), + reader_buffer: Vec::default(), + } + } + + /// Parses a RDF/XML file from a [`AsyncRead`] implementation. + /// + /// Count the number of people: + /// ``` + /// use oxrdf::{NamedNodeRef, vocab::rdf}; + /// use oxrdfxml::{RdfXmlParser, ParseError}; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<(), ParseError> { + /// let file = b" + /// + /// + /// + /// Foo + /// + /// + /// "; + /// + /// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); + /// let mut count = 0; + /// let mut parser = RdfXmlParser::new().parse_from_tokio_async_read(file.as_ref()); + /// while let Some(triple) = parser.next().await { + /// let triple = triple?; + /// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { + /// count += 1; + /// } + /// } + /// assert_eq!(2, count); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn parse_from_tokio_async_read( + &self, + read: R, + ) -> FromTokioAsyncReadRdfXmlReader { + FromTokioAsyncReadRdfXmlReader { + results: Vec::new(), + reader: self.parse(AsyncBufReader::new(read)), reader_buffer: Vec::default(), } } + + fn parse(&self, reader: T) -> RdfXmlReader { + let mut reader = NsReader::from_reader(reader); + reader.expand_empty_elements(true); + RdfXmlReader { + reader, + state: vec![RdfXmlState::Doc { + base_iri: self.base.clone(), + }], + custom_entities: HashMap::default(), + in_literal_depth: 0, + known_rdf_id: HashSet::default(), + is_end: false, + } + } } /// Parses a RDF/XML file from a [`Read`] implementation. Can be built using [`RdfXmlParser::parse_from_read`]. @@ -178,6 +227,76 @@ impl FromReadRdfXmlReader { } } +/// Parses a RDF/XML file from a [`AsyncRead`] implementation. Can be built using [`RdfXmlParser::parse_from_tokio_async_read`]. +/// +/// Count the number of people: +/// ``` +/// use oxrdf::{NamedNodeRef, vocab::rdf}; +/// use oxrdfxml::{RdfXmlParser, ParseError}; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<(), ParseError> { +/// let file = b" +/// +/// +/// +/// Foo +/// +/// +/// "; +/// +/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); +/// let mut count = 0; +/// let mut parser = RdfXmlParser::new().parse_from_tokio_async_read(file.as_ref()); +/// while let Some(triple) = parser.next().await { +/// let triple = triple?; +/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { +/// count += 1; +/// } +/// } +/// assert_eq!(2, count); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadRdfXmlReader { + results: Vec, + reader: RdfXmlReader>, + reader_buffer: Vec, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadRdfXmlReader { + /// Reads the next triple or returns `None` if the file is finished. + pub async fn next(&mut self) -> Option> { + loop { + if let Some(triple) = self.results.pop() { + return Some(Ok(triple)); + } else if self.reader.is_end { + return None; + } + if let Err(e) = self.parse_step().await { + return Some(Err(e)); + } + } + } + + /// The current byte position in the input data. + pub fn buffer_position(&self) -> usize { + self.reader.reader.buffer_position() + } + + async fn parse_step(&mut self) -> Result<(), ParseError> { + self.reader_buffer.clear(); + let event = self + .reader + .reader + .read_event_into_async(&mut self.reader_buffer) + .await?; + self.reader.parse_event(event, &mut self.results) + } +} + const RDF_ABOUT: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#about"; const RDF_ABOUT_EACH: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#aboutEach"; const RDF_ABOUT_EACH_PREFIX: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#aboutEachPrefix"; @@ -285,7 +404,7 @@ impl RdfXmlState { } } -struct RdfXmlReader { +struct RdfXmlReader { reader: NsReader, state: Vec, custom_entities: HashMap, @@ -294,7 +413,7 @@ struct RdfXmlReader { is_end: bool, } -impl RdfXmlReader { +impl RdfXmlReader { fn parse_event(&mut self, event: Event, results: &mut Vec) -> Result<(), ParseError> { match event { Event::Start(event) => self.parse_start_event(&event, results), diff --git a/lib/oxrdfxml/src/serializer.rs b/lib/oxrdfxml/src/serializer.rs index 03de4fb9..51802af5 100644 --- a/lib/oxrdfxml/src/serializer.rs +++ b/lib/oxrdfxml/src/serializer.rs @@ -5,6 +5,8 @@ use quick_xml::Writer; use std::io; use std::io::Write; use std::sync::Arc; +#[cfg(feature = "async-tokio")] +use tokio::io::AsyncWrite; /// A [RDF/XML](https://www.w3.org/TR/rdf-syntax-grammar/) serializer. /// @@ -34,7 +36,9 @@ impl RdfXmlSerializer { Self } - /// Writes a RdfXml file to a [`Write`] implementation. + /// Writes a RDF/XML file to a [`Write`] implementation. + /// + /// This writer does unbuffered writes. /// /// ``` /// use oxrdf::{NamedNodeRef, TripleRef}; @@ -56,7 +60,47 @@ impl RdfXmlSerializer { pub fn serialize_to_write(&self, write: W) -> ToWriteRdfXmlWriter { ToWriteRdfXmlWriter { writer: Writer::new_with_indent(write, b'\t', 1), - current_subject: None, + inner: InnerRdfXmlWriter { + current_subject: None, + }, + } + } + + /// Writes a RDF/XML file to a [`AsyncWrite`] implementation. + /// + /// This writer does unbuffered writes. + /// + /// ``` + /// use oxrdf::{NamedNodeRef, TripleRef}; + /// use oxrdfxml::RdfXmlSerializer; + /// use std::io::Result; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<()> { + /// let mut writer = RdfXmlSerializer::new().serialize_to_tokio_async_write(Vec::new()); + /// writer.write_triple(TripleRef::new( + /// NamedNodeRef::new_unchecked("http://example.com#me"), + /// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + /// NamedNodeRef::new_unchecked("http://schema.org/Person"), + /// )).await?; + /// assert_eq!( + /// b"\n\n\t\n\t\t\n\t\n", + /// writer.finish().await?.as_slice() + /// ); + /// Ok(()) + /// } + /// ``` + #[allow(clippy::unused_self)] + #[cfg(feature = "async-tokio")] + pub fn serialize_to_tokio_async_write( + &self, + write: W, + ) -> ToTokioAsyncWriteRdfXmlWriter { + ToTokioAsyncWriteRdfXmlWriter { + writer: Writer::new_with_indent(write, b'\t', 1), + inner: InnerRdfXmlWriter { + current_subject: None, + }, } } } @@ -81,24 +125,111 @@ impl RdfXmlSerializer { /// ``` pub struct ToWriteRdfXmlWriter { writer: Writer, - current_subject: Option, + inner: InnerRdfXmlWriter, } impl ToWriteRdfXmlWriter { /// Writes an extra triple. #[allow(clippy::match_wildcard_for_single_variants, unreachable_patterns)] pub fn write_triple<'a>(&mut self, t: impl Into>) -> io::Result<()> { + let mut buffer = Vec::new(); + self.inner.write_triple(t, &mut buffer)?; + self.flush_buffer(&mut buffer) + } + + /// Ends the write process and returns the underlying [`Write`]. + pub fn finish(mut self) -> io::Result { + let mut buffer = Vec::new(); + self.inner.finish(&mut buffer); + self.flush_buffer(&mut buffer)?; + Ok(self.writer.into_inner()) + } + + fn flush_buffer(&mut self, buffer: &mut Vec>) -> io::Result<()> { + for event in buffer.drain(0..) { + self.writer.write_event(event).map_err(map_err)?; + } + Ok(()) + } +} + +/// Writes a RDF/XML file to a [`AsyncWrite`] implementation. Can be built using [`RdfXmlSerializer::serialize_to_tokio_async_write`]. +/// +/// ``` +/// use oxrdf::{NamedNodeRef, TripleRef}; +/// use oxrdfxml::RdfXmlSerializer; +/// use std::io::Result; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<()> { +/// let mut writer = RdfXmlSerializer::new().serialize_to_tokio_async_write(Vec::new()); +/// writer.write_triple(TripleRef::new( +/// NamedNodeRef::new_unchecked("http://example.com#me"), +/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), +/// NamedNodeRef::new_unchecked("http://schema.org/Person"), +/// )).await?; +/// assert_eq!( +/// b"\n\n\t\n\t\t\n\t\n", +/// writer.finish().await?.as_slice() +/// ); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteRdfXmlWriter { + writer: Writer, + inner: InnerRdfXmlWriter, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteRdfXmlWriter { + /// Writes an extra triple. + #[allow(clippy::match_wildcard_for_single_variants, unreachable_patterns)] + pub async fn write_triple<'a>(&mut self, t: impl Into>) -> io::Result<()> { + let mut buffer = Vec::new(); + self.inner.write_triple(t, &mut buffer)?; + self.flush_buffer(&mut buffer).await + } + + /// Ends the write process and returns the underlying [`Write`]. + pub async fn finish(mut self) -> io::Result { + let mut buffer = Vec::new(); + self.inner.finish(&mut buffer); + self.flush_buffer(&mut buffer).await?; + Ok(self.writer.into_inner()) + } + + async fn flush_buffer(&mut self, buffer: &mut Vec>) -> io::Result<()> { + for event in buffer.drain(0..) { + self.writer + .write_event_async(event) + .await + .map_err(map_err)?; + } + Ok(()) + } +} + +pub struct InnerRdfXmlWriter { + current_subject: Option, +} + +impl InnerRdfXmlWriter { + #[allow(clippy::match_wildcard_for_single_variants, unreachable_patterns)] + fn write_triple<'a>( + &mut self, + t: impl Into>, + output: &mut Vec>, + ) -> io::Result<()> { if self.current_subject.is_none() { - self.write_start()?; + Self::write_start(output); } let triple = t.into(); // We open a new rdf:Description if useful if self.current_subject.as_ref().map(Subject::as_ref) != Some(triple.subject) { if self.current_subject.is_some() { - self.writer - .write_event(Event::End(BytesEnd::new("rdf:Description"))) - .map_err(map_err)?; + output.push(Event::End(BytesEnd::new("rdf:Description"))); } let mut description_open = BytesStart::new("rdf:Description"); @@ -116,10 +247,9 @@ impl ToWriteRdfXmlWriter { )) } } - self.writer - .write_event(Event::Start(description_open)) - .map_err(map_err)?; + output.push(Event::Start(description_open)); } + self.current_subject = Some(triple.subject.into_owned()); let (prop_prefix, prop_value) = split_iri(triple.predicate.as_str()); let (prop_qname, prop_xmlns) = if prop_value.is_empty() { @@ -127,25 +257,24 @@ impl ToWriteRdfXmlWriter { } else { (prop_value, ("xmlns", prop_prefix)) }; - let property_element = self.writer.create_element(prop_qname); - let property_element = property_element.with_attribute(prop_xmlns); - - match triple.object { - TermRef::NamedNode(node) => property_element - .with_attribute(("rdf:resource", node.as_str())) - .write_empty(), - TermRef::BlankNode(node) => property_element - .with_attribute(("rdf:nodeID", node.as_str())) - .write_empty(), + let mut property_open = BytesStart::new(prop_qname); + property_open.push_attribute(prop_xmlns); + let content = match triple.object { + TermRef::NamedNode(node) => { + property_open.push_attribute(("rdf:resource", node.as_str())); + None + } + TermRef::BlankNode(node) => { + property_open.push_attribute(("rdf:nodeID", node.as_str())); + None + } TermRef::Literal(literal) => { - let property_element = if let Some(language) = literal.language() { - property_element.with_attribute(("xml:lang", language)) + if let Some(language) = literal.language() { + property_open.push_attribute(("xml:lang", language)); } else if !literal.is_plain() { - property_element.with_attribute(("rdf:datatype", literal.datatype().as_str())) - } else { - property_element - }; - property_element.write_text_content(BytesText::new(literal.value())) + property_open.push_attribute(("rdf:datatype", literal.datatype().as_str())); + } + Some(literal.value()) } _ => { return Err(io::Error::new( @@ -153,37 +282,31 @@ impl ToWriteRdfXmlWriter { "RDF/XML only supports named, blank or literal object", )) } + }; + if let Some(content) = content { + output.push(Event::Start(property_open)); + output.push(Event::Text(BytesText::new(content))); + output.push(Event::End(BytesEnd::new(prop_qname))); + } else { + output.push(Event::Empty(property_open)); } - .map_err(map_err)?; - self.current_subject = Some(triple.subject.into_owned()); Ok(()) } - pub fn write_start(&mut self) -> io::Result<()> { - // We open the file - self.writer - .write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None))) - .map_err(map_err)?; + fn write_start(output: &mut Vec>) { + output.push(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None))); let mut rdf_open = BytesStart::new("rdf:RDF"); rdf_open.push_attribute(("xmlns:rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")); - self.writer - .write_event(Event::Start(rdf_open)) - .map_err(map_err) + output.push(Event::Start(rdf_open)) } - /// Ends the write process and returns the underlying [`Write`]. - pub fn finish(mut self) -> io::Result { + fn finish(&self, output: &mut Vec>) { if self.current_subject.is_some() { - self.writer - .write_event(Event::End(BytesEnd::new("rdf:Description"))) - .map_err(map_err)?; + output.push(Event::End(BytesEnd::new("rdf:Description"))); } else { - self.write_start()?; + Self::write_start(output); } - self.writer - .write_event(Event::End(BytesEnd::new("rdf:RDF"))) - .map_err(map_err)?; - Ok(self.writer.into_inner()) + output.push(Event::End(BytesEnd::new("rdf:RDF"))); } } diff --git a/lib/oxttl/Cargo.toml b/lib/oxttl/Cargo.toml index 8717a9b6..06871c96 100644 --- a/lib/oxttl/Cargo.toml +++ b/lib/oxttl/Cargo.toml @@ -16,12 +16,17 @@ rust-version = "1.65" [features] default = [] rdf-star = ["oxrdf/rdf-star"] +async-tokio = ["dep:tokio"] [dependencies] memchr = "2" oxrdf = { version = "0.2.0-alpha.1-dev", path = "../oxrdf" } oxiri = "0.2" oxilangtag = "0.1" +tokio = { version = "1", optional = true, features = ["io-util"] } + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "macros"] } [package.metadata.docs.rs] all-features = true diff --git a/lib/oxttl/src/n3.rs b/lib/oxttl/src/n3.rs index 4a537ef7..106263a2 100644 --- a/lib/oxttl/src/n3.rs +++ b/lib/oxttl/src/n3.rs @@ -1,6 +1,8 @@ //! A [N3](https://w3c.github.io/N3/spec/) streaming parser implemented by [`N3Parser`]. use crate::lexer::{resolve_local_name, N3Lexer, N3LexerMode, N3LexerOptions, N3Token}; +#[cfg(feature = "async-tokio")] +use crate::toolkit::FromTokioAsyncReadIterator; use crate::toolkit::{ FromReadIterator, Lexer, Parser, RuleRecognizer, RuleRecognizerError, SyntaxError, }; @@ -16,6 +18,8 @@ use oxrdf::{ use std::collections::HashMap; use std::fmt; use std::io::Read; +#[cfg(feature = "async-tokio")] +use tokio::io::AsyncRead; /// A N3 term i.e. a RDF `Term` or a `Variable`. #[derive(Eq, PartialEq, Debug, Clone, Hash)] @@ -261,6 +265,47 @@ impl N3Parser { } } + /// Parses a N3 file from a [`AsyncRead`] implementation. + /// + /// Count the number of people: + /// ``` + /// use oxrdf::{NamedNode, vocab::rdf}; + /// use oxttl::n3::{N3Parser, N3Term}; + /// use oxttl::ParseError; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<(), ParseError> { + /// let file = b"@base . + /// @prefix schema: . + /// a schema:Person ; + /// schema:name \"Foo\" . + /// a schema:Person ; + /// schema:name \"Bar\" ."; + /// + /// let rdf_type = N3Term::NamedNode(rdf::TYPE.into_owned()); + /// let schema_person = N3Term::NamedNode(NamedNode::new_unchecked("http://schema.org/Person")); + /// let mut count = 0; + /// let mut parser = N3Parser::new().parse_from_tokio_async_read(file.as_ref()); + /// while let Some(triple) = parser.next().await { + /// let triple = triple?; + /// if triple.predicate == rdf_type && triple.object == schema_person { + /// count += 1; + /// } + /// } + /// assert_eq!(2, count); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn parse_from_tokio_async_read( + &self, + read: R, + ) -> FromTokioAsyncReadN3Reader { + FromTokioAsyncReadN3Reader { + inner: self.parse().parser.parse_from_tokio_async_read(read), + } + } + /// Allows to parse a N3 file by using a low-level API. /// /// Count the number of people: @@ -343,6 +388,50 @@ impl Iterator for FromReadN3Reader { } } +/// Parses a N3 file from a [`AsyncRead`] implementation. Can be built using [`N3Parser::parse_from_tokio_async_read`]. +/// +/// Count the number of people: +/// ``` +/// use oxrdf::{NamedNode, vocab::rdf}; +/// use oxttl::n3::{N3Parser, N3Term}; +/// use oxttl::ParseError; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<(), ParseError> { +/// let file = b"@base . +/// @prefix schema: . +/// a schema:Person ; +/// schema:name \"Foo\" . +/// a schema:Person ; +/// schema:name \"Bar\" ."; +/// +/// let rdf_type = N3Term::NamedNode(rdf::TYPE.into_owned()); +/// let schema_person = N3Term::NamedNode(NamedNode::new_unchecked("http://schema.org/Person")); +/// let mut count = 0; +/// let mut parser = N3Parser::new().parse_from_tokio_async_read(file.as_ref()); +/// while let Some(triple) = parser.next().await { +/// let triple = triple?; +/// if triple.predicate == rdf_type && triple.object == schema_person { +/// count += 1; +/// } +/// } +/// assert_eq!(2, count); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadN3Reader { + inner: FromTokioAsyncReadIterator, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadN3Reader { + /// Reads the next triple or returns `None` if the file is finished. + pub async fn next(&mut self) -> Option> { + Some(self.inner.next().await?.map(Into::into)) + } +} + /// Parses a N3 file by using a low-level API. Can be built using [`N3Parser::parse`]. /// /// Count the number of people: diff --git a/lib/oxttl/src/nquads.rs b/lib/oxttl/src/nquads.rs index 4414d3fc..5cd27dae 100644 --- a/lib/oxttl/src/nquads.rs +++ b/lib/oxttl/src/nquads.rs @@ -1,9 +1,13 @@ //! A [N-Quads](https://www.w3.org/TR/n-quads/) streaming parser implemented by [`NQuadsParser`]. use crate::line_formats::NQuadsRecognizer; +#[cfg(feature = "async-tokio")] +use crate::toolkit::FromTokioAsyncReadIterator; use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError}; use oxrdf::{Quad, QuadRef}; use std::io::{self, Read, Write}; +#[cfg(feature = "async-tokio")] +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; /// A [N-Quads](https://www.w3.org/TR/n-quads/) streaming parser. /// @@ -81,6 +85,43 @@ impl NQuadsParser { } } + /// Parses a N-Quads file from a [`AsyncRead`] implementation. + /// + /// Count the number of people: + /// ``` + /// use oxrdf::{NamedNodeRef, vocab::rdf}; + /// use oxttl::{ParseError, NQuadsParser}; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<(), ParseError> { + /// let file = b" . + /// \"Foo\" . + /// . + /// \"Bar\" ."; + /// + /// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); + /// let mut count = 0; + /// let mut parser = NQuadsParser::new().parse_from_tokio_async_read(file.as_ref()); + /// while let Some(triple) = parser.next().await { + /// let triple = triple?; + /// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { + /// count += 1; + /// } + /// } + /// assert_eq!(2, count); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn parse_from_tokio_async_read( + &self, + read: R, + ) -> FromTokioAsyncReadNQuadsReader { + FromTokioAsyncReadNQuadsReader { + inner: self.parse().parser.parse_from_tokio_async_read(read), + } + } + /// Allows to parse a N-Quads file by using a low-level API. /// /// Count the number of people: @@ -164,6 +205,46 @@ impl Iterator for FromReadNQuadsReader { } } +/// Parses a N-Quads file from a [`AsyncRead`] implementation. Can be built using [`NQuadsParser::parse_from_tokio_async_read`]. +/// +/// Count the number of people: +/// ``` +/// use oxrdf::{NamedNodeRef, vocab::rdf}; +/// use oxttl::{ParseError, NQuadsParser}; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<(), ParseError> { +/// let file = b" . +/// \"Foo\" . +/// . +/// \"Bar\" ."; +/// +/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); +/// let mut count = 0; +/// let mut parser = NQuadsParser::new().parse_from_tokio_async_read(file.as_ref()); +/// while let Some(triple) = parser.next().await { +/// let triple = triple?; +/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { +/// count += 1; +/// } +/// } +/// assert_eq!(2, count); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadNQuadsReader { + inner: FromTokioAsyncReadIterator, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadNQuadsReader { + /// Reads the next triple or returns `None` if the file is finished. + pub async fn next(&mut self) -> Option> { + Some(self.inner.next().await?.map(Into::into)) + } +} + /// Parses a N-Quads file by using a low-level API. Can be built using [`NQuadsParser::parse`]. /// /// Count the number of people: @@ -288,6 +369,41 @@ impl NQuadsSerializer { } } + /// Writes a N-Quads file to a [`AsyncWrite`] implementation. + /// + /// ``` + /// use oxrdf::{NamedNodeRef, QuadRef}; + /// use oxttl::NQuadsSerializer; + /// use std::io::Result; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<()> { + /// let mut writer = NQuadsSerializer::new().serialize_to_tokio_async_write(Vec::new()); + /// writer.write_quad(QuadRef::new( + /// NamedNodeRef::new_unchecked("http://example.com#me"), + /// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + /// NamedNodeRef::new_unchecked("http://schema.org/Person"), + /// NamedNodeRef::new_unchecked("http://example.com"), + /// )).await?; + /// assert_eq!( + /// b" .\n", + /// writer.finish().as_slice() + /// ); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn serialize_to_tokio_async_write( + &self, + write: W, + ) -> ToTokioAsyncWriteNQuadsWriter { + ToTokioAsyncWriteNQuadsWriter { + write, + writer: self.serialize(), + buffer: Vec::new(), + } + } + /// Builds a low-level N-Quads writer. /// /// ``` @@ -350,6 +466,52 @@ impl ToWriteNQuadsWriter { } } +/// Writes a N-Quads file to a [`AsyncWrite`] implementation. Can be built using [`NQuadsSerializer::serialize_to_tokio_async_write`]. +/// +/// ``` +/// use oxrdf::{NamedNodeRef, QuadRef}; +/// use oxttl::NQuadsSerializer; +/// use std::io::Result; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<()> { +/// let mut writer = NQuadsSerializer::new().serialize_to_tokio_async_write(Vec::new()); +/// writer.write_quad(QuadRef::new( +/// NamedNodeRef::new_unchecked("http://example.com#me"), +/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), +/// NamedNodeRef::new_unchecked("http://schema.org/Person"), +/// NamedNodeRef::new_unchecked("http://example.com"), +/// )).await?; +/// assert_eq!( +/// b" .\n", +/// writer.finish().as_slice() +/// ); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteNQuadsWriter { + write: W, + writer: LowLevelNQuadsWriter, + buffer: Vec, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteNQuadsWriter { + /// Writes an extra quad. + pub async fn write_quad<'a>(&mut self, q: impl Into>) -> io::Result<()> { + self.writer.write_quad(q, &mut self.buffer)?; + self.write.write_all(&self.buffer).await?; + self.buffer.clear(); + Ok(()) + } + + /// Ends the write process and returns the underlying [`Write`]. + pub fn finish(self) -> W { + self.write + } +} + /// Writes a N-Quads file by using a low-level API. Can be built using [`NQuadsSerializer::serialize`]. /// /// ``` diff --git a/lib/oxttl/src/ntriples.rs b/lib/oxttl/src/ntriples.rs index cac8cd22..2278db81 100644 --- a/lib/oxttl/src/ntriples.rs +++ b/lib/oxttl/src/ntriples.rs @@ -2,9 +2,13 @@ //! and a serializer implemented by [`NTriplesSerializer`]. use crate::line_formats::NQuadsRecognizer; +#[cfg(feature = "async-tokio")] +use crate::toolkit::FromTokioAsyncReadIterator; use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError}; use oxrdf::{Triple, TripleRef}; use std::io::{self, Read, Write}; +#[cfg(feature = "async-tokio")] +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; /// A [N-Triples](https://www.w3.org/TR/n-triples/) streaming parser. /// @@ -82,6 +86,43 @@ impl NTriplesParser { } } + /// Parses a N-Triples file from a [`AsyncRead`] implementation. + /// + /// Count the number of people: + /// ``` + /// use oxrdf::{NamedNodeRef, vocab::rdf}; + /// use oxttl::{ParseError, NTriplesParser}; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<(), ParseError> { + /// let file = b" . + /// \"Foo\" . + /// . + /// \"Bar\" ."; + /// + /// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); + /// let mut count = 0; + /// let mut parser = NTriplesParser::new().parse_from_tokio_async_read(file.as_ref()); + /// while let Some(triple) = parser.next().await { + /// let triple = triple?; + /// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { + /// count += 1; + /// } + /// } + /// assert_eq!(2, count); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn parse_from_tokio_async_read( + &self, + read: R, + ) -> FromTokioAsyncReadNTriplesReader { + FromTokioAsyncReadNTriplesReader { + inner: self.parse().parser.parse_from_tokio_async_read(read), + } + } + /// Allows to parse a N-Triples file by using a low-level API. /// /// Count the number of people: @@ -165,6 +206,46 @@ impl Iterator for FromReadNTriplesReader { } } +/// Parses a N-Triples file from a [`AsyncRead`] implementation. Can be built using [`NTriplesParser::parse_from_tokio_async_read`]. +/// +/// Count the number of people: +/// ``` +/// use oxrdf::{NamedNodeRef, vocab::rdf}; +/// use oxttl::{ParseError, NTriplesParser}; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<(), ParseError> { +/// let file = b" . +/// \"Foo\" . +/// . +/// \"Bar\" ."; +/// +/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); +/// let mut count = 0; +/// let mut parser = NTriplesParser::new().parse_from_tokio_async_read(file.as_ref()); +/// while let Some(triple) = parser.next().await { +/// let triple = triple?; +/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { +/// count += 1; +/// } +/// } +/// assert_eq!(2, count); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadNTriplesReader { + inner: FromTokioAsyncReadIterator, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadNTriplesReader { + /// Reads the next triple or returns `None` if the file is finished. + pub async fn next(&mut self) -> Option> { + Some(self.inner.next().await?.map(Into::into)) + } +} + /// Parses a N-Triples file by using a low-level API. Can be built using [`NTriplesParser::parse`]. /// /// Count the number of people: @@ -287,6 +368,40 @@ impl NTriplesSerializer { } } + /// Writes a N-Triples file to a [`AsyncWrite`] implementation. + /// + /// ``` + /// use oxrdf::{NamedNodeRef, TripleRef}; + /// use oxttl::NTriplesSerializer; + /// use std::io::Result; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<()> { + /// let mut writer = NTriplesSerializer::new().serialize_to_tokio_async_write(Vec::new()); + /// writer.write_triple(TripleRef::new( + /// NamedNodeRef::new_unchecked("http://example.com#me"), + /// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + /// NamedNodeRef::new_unchecked("http://schema.org/Person"), + /// )).await?; + /// assert_eq!( + /// b" .\n", + /// writer.finish().as_slice() + /// ); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn serialize_to_tokio_async_write( + &self, + write: W, + ) -> ToTokioAsyncWriteNTriplesWriter { + ToTokioAsyncWriteNTriplesWriter { + write, + writer: self.serialize(), + buffer: Vec::new(), + } + } + /// Builds a low-level N-Triples writer. /// /// ``` @@ -347,6 +462,51 @@ impl ToWriteNTriplesWriter { } } +/// Writes a N-Triples file to a [`AsyncWrite`] implementation. Can be built using [`NTriplesSerializer::serialize_to_tokio_async_write`]. +/// +/// ``` +/// use oxrdf::{NamedNodeRef, TripleRef}; +/// use oxttl::NTriplesSerializer; +/// use std::io::Result; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<()> { +/// let mut writer = NTriplesSerializer::new().serialize_to_tokio_async_write(Vec::new()); +/// writer.write_triple(TripleRef::new( +/// NamedNodeRef::new_unchecked("http://example.com#me"), +/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), +/// NamedNodeRef::new_unchecked("http://schema.org/Person") +/// )).await?; +/// assert_eq!( +/// b" .\n", +/// writer.finish().as_slice() +/// ); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteNTriplesWriter { + write: W, + writer: LowLevelNTriplesWriter, + buffer: Vec, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteNTriplesWriter { + /// Writes an extra triple. + pub async fn write_triple<'a>(&mut self, t: impl Into>) -> io::Result<()> { + self.writer.write_triple(t, &mut self.buffer)?; + self.write.write_all(&self.buffer).await?; + self.buffer.clear(); + Ok(()) + } + + /// Ends the write process and returns the underlying [`Write`]. + pub fn finish(self) -> W { + self.write + } +} + /// Writes a N-Triples file by using a low-level API. Can be built using [`NTriplesSerializer::serialize`]. /// /// ``` diff --git a/lib/oxttl/src/toolkit/lexer.rs b/lib/oxttl/src/toolkit/lexer.rs index 3510ab28..664e35fe 100644 --- a/lib/oxttl/src/toolkit/lexer.rs +++ b/lib/oxttl/src/toolkit/lexer.rs @@ -3,6 +3,8 @@ use std::error::Error; use std::fmt; use std::io::{self, Read}; use std::ops::{Range, RangeInclusive}; +#[cfg(feature = "async-tokio")] +use tokio::io::{AsyncRead, AsyncReadExt}; pub trait TokenRecognizer { type Token<'a> @@ -122,6 +124,35 @@ impl Lexer { Ok(()) } + #[cfg(feature = "async-tokio")] + pub async fn extend_from_tokio_async_read( + &mut self, + read: &mut (impl AsyncRead + Unpin), + ) -> io::Result<()> { + self.shrink_if_useful(); + let min_end = self.end + self.min_buffer_size; + if min_end > self.max_buffer_size { + return Err(io::Error::new( + io::ErrorKind::OutOfMemory, + format!( + "The buffer maximal size is {} < {min_end}", + self.max_buffer_size + ), + )); + } + if self.data.len() < min_end { + self.data.resize(min_end, 0); + } + if self.data.len() < self.data.capacity() { + // We keep extending to have as much space as available without reallocation + self.data.resize(self.data.capacity(), 0); + } + let read = read.read(&mut self.data[self.end..]).await?; + self.end += read; + self.is_ending = read == 0; + Ok(()) + } + pub fn read_next( &mut self, options: &R::Options, diff --git a/lib/oxttl/src/toolkit/mod.rs b/lib/oxttl/src/toolkit/mod.rs index 39f9d40d..300b9c2c 100644 --- a/lib/oxttl/src/toolkit/mod.rs +++ b/lib/oxttl/src/toolkit/mod.rs @@ -6,6 +6,8 @@ mod lexer; mod parser; pub use self::lexer::{Lexer, LexerError, TokenRecognizer, TokenRecognizerError}; +#[cfg(feature = "async-tokio")] +pub use self::parser::FromTokioAsyncReadIterator; pub use self::parser::{ FromReadIterator, ParseError, Parser, RuleRecognizer, RuleRecognizerError, SyntaxError, }; diff --git a/lib/oxttl/src/toolkit/parser.rs b/lib/oxttl/src/toolkit/parser.rs index 44c01d5a..c5808199 100644 --- a/lib/oxttl/src/toolkit/parser.rs +++ b/lib/oxttl/src/toolkit/parser.rs @@ -4,6 +4,8 @@ use std::error::Error; use std::io::Read; use std::ops::Range; use std::{fmt, io}; +#[cfg(feature = "async-tokio")] +use tokio::io::AsyncRead; pub trait RuleRecognizer: Sized { type TokenRecognizer: TokenRecognizer; @@ -114,6 +116,14 @@ impl Parser { pub fn parse_from_read(self, read: R) -> FromReadIterator { FromReadIterator { read, parser: self } } + + #[cfg(feature = "async-tokio")] + pub fn parse_from_tokio_async_read( + self, + read: R, + ) -> FromTokioAsyncReadIterator { + FromTokioAsyncReadIterator { read, parser: self } + } } /// An error in the syntax of the parsed file. @@ -258,3 +268,29 @@ impl Iterator for FromReadIterator { None } } + +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadIterator { + read: R, + parser: Parser, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadIterator { + pub async fn next(&mut self) -> Option> { + while !self.parser.is_end() { + if let Some(result) = self.parser.read_next() { + return Some(result.map_err(ParseError::Syntax)); + } + if let Err(e) = self + .parser + .lexer + .extend_from_tokio_async_read(&mut self.read) + .await + { + return Some(Err(e.into())); + } + } + None + } +} diff --git a/lib/oxttl/src/trig.rs b/lib/oxttl/src/trig.rs index 62fc331f..5ad21402 100644 --- a/lib/oxttl/src/trig.rs +++ b/lib/oxttl/src/trig.rs @@ -1,12 +1,16 @@ //! A [TriG](https://www.w3.org/TR/trig/) streaming parser implemented by [`TriGParser`]. use crate::terse::TriGRecognizer; +#[cfg(feature = "async-tokio")] +use crate::toolkit::FromTokioAsyncReadIterator; use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError}; use oxiri::{Iri, IriParseError}; use oxrdf::{vocab::xsd, GraphName, NamedNode, Quad, QuadRef, Subject, TermRef}; use std::collections::HashMap; use std::fmt; use std::io::{self, Read, Write}; +#[cfg(feature = "async-tokio")] +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; /// A [TriG](https://www.w3.org/TR/trig/) streaming parser. /// @@ -107,6 +111,45 @@ impl TriGParser { } } + /// Parses a TriG file from a [`AsyncRead`] implementation. + /// + /// Count the number of people: + /// ``` + /// use oxrdf::{NamedNodeRef, vocab::rdf}; + /// use oxttl::{ParseError, TriGParser}; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<(), ParseError> { + /// let file = b"@base . + /// @prefix schema: . + /// a schema:Person ; + /// schema:name \"Foo\" . + /// a schema:Person ; + /// schema:name \"Bar\" ."; + /// + /// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); + /// let mut count = 0; + /// let mut parser = TriGParser::new().parse_from_tokio_async_read(file.as_ref()); + /// while let Some(triple) = parser.next().await { + /// let triple = triple?; + /// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { + /// count += 1; + /// } + /// } + /// assert_eq!(2, count); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn parse_from_tokio_async_read( + &self, + read: R, + ) -> FromTokioAsyncReadTriGReader { + FromTokioAsyncReadTriGReader { + inner: self.parse().parser.parse_from_tokio_async_read(read), + } + } + /// Allows to parse a TriG file by using a low-level API. /// /// Count the number of people: @@ -193,6 +236,48 @@ impl Iterator for FromReadTriGReader { } } +/// Parses a TriG file from a [`AsyncRead`] implementation. Can be built using [`TriGParser::parse_from_tokio_async_read`]. +/// +/// Count the number of people: +/// ``` +/// use oxrdf::{NamedNodeRef, vocab::rdf}; +/// use oxttl::{ParseError, TriGParser}; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<(), ParseError> { +/// let file = b"@base . +/// @prefix schema: . +/// a schema:Person ; +/// schema:name \"Foo\" . +/// a schema:Person ; +/// schema:name \"Bar\" ."; +/// +/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); +/// let mut count = 0; +/// let mut parser = TriGParser::new().parse_from_tokio_async_read(file.as_ref()); +/// while let Some(triple) = parser.next().await { +/// let triple = triple?; +/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { +/// count += 1; +/// } +/// } +/// assert_eq!(2, count); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadTriGReader { + inner: FromTokioAsyncReadIterator, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadTriGReader { + /// Reads the next triple or returns `None` if the file is finished. + pub async fn next(&mut self) -> Option> { + Some(self.inner.next().await?.map(Into::into)) + } +} + /// Parses a TriG file by using a low-level API. Can be built using [`TriGParser::parse`]. /// /// Count the number of people: @@ -317,6 +402,41 @@ impl TriGSerializer { } } + /// Writes a TriG file to a [`AsyncWrite`] implementation. + /// + /// ``` + /// use oxrdf::{NamedNodeRef, QuadRef}; + /// use oxttl::TriGSerializer; + /// use std::io::Result; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<()> { + /// let mut writer = TriGSerializer::new().serialize_to_tokio_async_write(Vec::new()); + /// writer.write_quad(QuadRef::new( + /// NamedNodeRef::new_unchecked("http://example.com#me"), + /// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + /// NamedNodeRef::new_unchecked("http://schema.org/Person"), + /// NamedNodeRef::new_unchecked("http://example.com"), + /// )).await?; + /// assert_eq!( + /// b" {\n\t .\n}\n", + /// writer.finish().await?.as_slice() + /// ); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn serialize_to_tokio_async_write( + &self, + write: W, + ) -> ToTokioAsyncWriteTriGWriter { + ToTokioAsyncWriteTriGWriter { + write, + writer: self.serialize(), + buffer: Vec::new(), + } + } + /// Builds a low-level TriG writer. /// /// ``` @@ -384,6 +504,55 @@ impl ToWriteTriGWriter { } } +/// Writes a TriG file to a [`AsyncWrite`] implementation. Can be built using [`TriGSerializer::serialize_to_tokio_async_write`]. +/// +/// ``` +/// use oxrdf::{NamedNodeRef, QuadRef}; +/// use oxttl::TriGSerializer; +/// use std::io::Result; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<()> { +/// let mut writer = TriGSerializer::new().serialize_to_tokio_async_write(Vec::new()); +/// writer.write_quad(QuadRef::new( +/// NamedNodeRef::new_unchecked("http://example.com#me"), +/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), +/// NamedNodeRef::new_unchecked("http://schema.org/Person"), +/// NamedNodeRef::new_unchecked("http://example.com"), +/// )).await?; +/// assert_eq!( +/// b" {\n\t .\n}\n", +/// writer.finish().await?.as_slice() +/// ); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteTriGWriter { + write: W, + writer: LowLevelTriGWriter, + buffer: Vec, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteTriGWriter { + /// Writes an extra quad. + pub async fn write_quad<'a>(&mut self, q: impl Into>) -> io::Result<()> { + self.writer.write_quad(q, &mut self.buffer)?; + self.write.write_all(&self.buffer).await?; + self.buffer.clear(); + Ok(()) + } + + /// Ends the write process and returns the underlying [`Write`]. + pub async fn finish(mut self) -> io::Result { + self.writer.finish(&mut self.buffer)?; + self.write.write_all(&self.buffer).await?; + self.buffer.clear(); + Ok(self.write) + } +} + /// Writes a TriG file by using a low-level API. Can be built using [`TriGSerializer::serialize`]. /// /// ``` diff --git a/lib/oxttl/src/turtle.rs b/lib/oxttl/src/turtle.rs index 3b5639c0..0c875b8a 100644 --- a/lib/oxttl/src/turtle.rs +++ b/lib/oxttl/src/turtle.rs @@ -1,13 +1,18 @@ //! A [Turtle](https://www.w3.org/TR/turtle/) streaming parser implemented by [`TurtleParser`]. use crate::terse::TriGRecognizer; +#[cfg(feature = "async-tokio")] +use crate::toolkit::FromTokioAsyncReadIterator; use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError}; -use crate::trig::{LowLevelTriGWriter, ToWriteTriGWriter}; -use crate::TriGSerializer; +#[cfg(feature = "async-tokio")] +use crate::trig::ToTokioAsyncWriteTriGWriter; +use crate::trig::{LowLevelTriGWriter, ToWriteTriGWriter, TriGSerializer}; use oxiri::{Iri, IriParseError}; use oxrdf::{GraphNameRef, Triple, TripleRef}; use std::collections::HashMap; use std::io::{self, Read, Write}; +#[cfg(feature = "async-tokio")] +use tokio::io::{AsyncRead, AsyncWrite}; /// A [Turtle](https://www.w3.org/TR/turtle/) streaming parser. /// @@ -108,6 +113,45 @@ impl TurtleParser { } } + /// Parses a Turtle file from a [`AsyncRead`] implementation. + /// + /// Count the number of people: + /// ``` + /// use oxrdf::{NamedNodeRef, vocab::rdf}; + /// use oxttl::{ParseError, TurtleParser}; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<(), ParseError> { + /// let file = b"@base . + /// @prefix schema: . + /// a schema:Person ; + /// schema:name \"Foo\" . + /// a schema:Person ; + /// schema:name \"Bar\" ."; + /// + /// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); + /// let mut count = 0; + /// let mut parser = TurtleParser::new().parse_from_tokio_async_read(file.as_ref()); + /// while let Some(triple) = parser.next().await { + /// let triple = triple?; + /// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { + /// count += 1; + /// } + /// } + /// assert_eq!(2, count); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn parse_from_tokio_async_read( + &self, + read: R, + ) -> FromTokioAsyncReadTurtleReader { + FromTokioAsyncReadTurtleReader { + inner: self.parse().parser.parse_from_tokio_async_read(read), + } + } + /// Allows to parse a Turtle file by using a low-level API. /// /// Count the number of people: @@ -194,6 +238,48 @@ impl Iterator for FromReadTurtleReader { } } +/// Parses a Turtle file from a [`AsyncRead`] implementation. Can be built using [`TurtleParser::parse_from_tokio_async_read`]. +/// +/// Count the number of people: +/// ``` +/// use oxrdf::{NamedNodeRef, vocab::rdf}; +/// use oxttl::{ParseError, TurtleParser}; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<(), ParseError> { +/// let file = b"@base . +/// @prefix schema: . +/// a schema:Person ; +/// schema:name \"Foo\" . +/// a schema:Person ; +/// schema:name \"Bar\" ."; +/// +/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person"); +/// let mut count = 0; +/// let mut parser = TurtleParser::new().parse_from_tokio_async_read(file.as_ref()); +/// while let Some(triple) = parser.next().await { +/// let triple = triple?; +/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() { +/// count += 1; +/// } +/// } +/// assert_eq!(2, count); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct FromTokioAsyncReadTurtleReader { + inner: FromTokioAsyncReadIterator, +} + +#[cfg(feature = "async-tokio")] +impl FromTokioAsyncReadTurtleReader { + /// Reads the next triple or returns `None` if the file is finished. + pub async fn next(&mut self) -> Option> { + Some(self.inner.next().await?.map(Into::into)) + } +} + /// Parses a Turtle file by using a low-level API. Can be built using [`TurtleParser::parse`]. /// /// Count the number of people: @@ -317,6 +403,38 @@ impl TurtleSerializer { } } + /// Writes a Turtle file to a [`AsyncWrite`] implementation. + /// + /// ``` + /// use oxrdf::{NamedNodeRef, TripleRef}; + /// use oxttl::TurtleSerializer; + /// use std::io::Result; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() -> Result<()> { + /// let mut writer = TurtleSerializer::new().serialize_to_tokio_async_write(Vec::new()); + /// writer.write_triple(TripleRef::new( + /// NamedNodeRef::new_unchecked("http://example.com#me"), + /// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), + /// NamedNodeRef::new_unchecked("http://schema.org/Person"), + /// )).await?; + /// assert_eq!( + /// b" .\n", + /// writer.finish().await?.as_slice() + /// ); + /// Ok(()) + /// } + /// ``` + #[cfg(feature = "async-tokio")] + pub fn serialize_to_tokio_async_write( + &self, + write: W, + ) -> ToTokioAsyncWriteTurtleWriter { + ToTokioAsyncWriteTurtleWriter { + inner: self.inner.serialize_to_tokio_async_write(write), + } + } + /// Builds a low-level Turtle writer. /// /// ``` @@ -379,6 +497,48 @@ impl ToWriteTurtleWriter { } } +/// Writes a Turtle file to a [`AsyncWrite`] implementation. Can be built using [`TurtleSerializer::serialize_to_tokio_async_write`]. +/// +/// ``` +/// use oxrdf::{NamedNodeRef, TripleRef}; +/// use oxttl::TurtleSerializer; +/// use std::io::Result; +/// +/// #[tokio::main(flavor = "current_thread")] +/// async fn main() -> Result<()> { +/// let mut writer = TurtleSerializer::new().serialize_to_tokio_async_write(Vec::new()); +/// writer.write_triple(TripleRef::new( +/// NamedNodeRef::new_unchecked("http://example.com#me"), +/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), +/// NamedNodeRef::new_unchecked("http://schema.org/Person") +/// )).await?; +/// assert_eq!( +/// b" .\n", +/// writer.finish().await?.as_slice() +/// ); +/// Ok(()) +/// } +/// ``` +#[cfg(feature = "async-tokio")] +pub struct ToTokioAsyncWriteTurtleWriter { + inner: ToTokioAsyncWriteTriGWriter, +} + +#[cfg(feature = "async-tokio")] +impl ToTokioAsyncWriteTurtleWriter { + /// Writes an extra triple. + pub async fn write_triple<'a>(&mut self, t: impl Into>) -> io::Result<()> { + self.inner + .write_quad(t.into().in_graph(GraphNameRef::DefaultGraph)) + .await + } + + /// Ends the write process and returns the underlying [`Write`]. + pub async fn finish(self) -> io::Result { + self.inner.finish().await + } +} + /// Writes a Turtle file by using a low-level API. Can be built using [`TurtleSerializer::serialize`]. /// /// ```