RDF I/O: adds basic Tokio support

pull/572/head
Tpt 2 years ago committed by Thomas Tanon
parent 501f9ce6f9
commit cdabe52847
  1. 39
      Cargo.lock
  2. 8
      lib/oxrdfxml/Cargo.toml
  3. 6
      lib/oxrdfxml/src/lib.rs
  4. 135
      lib/oxrdfxml/src/parser.rs
  5. 215
      lib/oxrdfxml/src/serializer.rs
  6. 5
      lib/oxttl/Cargo.toml
  7. 89
      lib/oxttl/src/n3.rs
  8. 162
      lib/oxttl/src/nquads.rs
  9. 160
      lib/oxttl/src/ntriples.rs
  10. 31
      lib/oxttl/src/toolkit/lexer.rs
  11. 2
      lib/oxttl/src/toolkit/mod.rs
  12. 36
      lib/oxttl/src/toolkit/parser.rs
  13. 169
      lib/oxttl/src/trig.rs
  14. 164
      lib/oxttl/src/turtle.rs

39
Cargo.lock generated

@ -194,6 +194,12 @@ version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]]
name = "bytes"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cast"
version = "0.3.0"
@ -1038,6 +1044,7 @@ dependencies = [
"oxiri",
"oxrdf",
"quick-xml",
"tokio",
]
[[package]]
@ -1064,6 +1071,7 @@ dependencies = [
"oxilangtag",
"oxiri",
"oxrdf",
"tokio",
]
[[package]]
@ -1128,6 +1136,12 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pkg-config"
version = "0.3.27"
@ -1293,6 +1307,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81b9228215d82c7b61490fec1de287136b5de6f5700f6e58ea9ad61a7964ca51"
dependencies = [
"memchr",
"tokio",
]
[[package]]
@ -1785,6 +1800,30 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
dependencies = [
"autocfg",
"bytes",
"pin-project-lite",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.18",
]
[[package]]
name = "typenum"
version = "1.16.0"

@ -13,11 +13,19 @@ Parser for the RDF/XML language
edition = "2021"
rust-version = "1.65"
[features]
default = []
async-tokio = ["dep:tokio", "quick-xml/async-tokio"]
[dependencies]
oxrdf = { version = "0.2.0-alpha.1-dev", path = "../oxrdf" }
oxilangtag = "0.1"
oxiri = "0.2"
quick-xml = "0.29"
tokio = { version = "1", optional = true, features = ["io-util"] }
[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
[package.metadata.docs.rs]
all-features = true

@ -9,6 +9,10 @@ mod parser;
mod serializer;
mod utils;
pub use crate::serializer::{RdfXmlSerializer, ToWriteRdfXmlWriter};
pub use error::{ParseError, SyntaxError};
#[cfg(feature = "async-tokio")]
pub use parser::FromTokioAsyncReadRdfXmlReader;
pub use parser::{FromReadRdfXmlReader, RdfXmlParser};
#[cfg(feature = "async-tokio")]
pub use serializer::ToTokioAsyncWriteRdfXmlWriter;
pub use serializer::{RdfXmlSerializer, ToWriteRdfXmlWriter};

@ -10,8 +10,10 @@ use quick_xml::events::*;
use quick_xml::name::{LocalName, QName, ResolveResult};
use quick_xml::{NsReader, Writer};
use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufReader, Read};
use std::io::{BufReader, Read};
use std::str;
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, BufReader as AsyncBufReader};
/// A [RDF/XML](https://www.w3.org/TR/rdf-syntax-grammar/) streaming parser.
///
@ -93,11 +95,60 @@ impl RdfXmlParser {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn parse_from_read<R: Read>(&self, read: R) -> FromReadRdfXmlReader<R> {
let mut reader = NsReader::from_reader(BufReader::new(read));
reader.expand_empty_elements(true);
FromReadRdfXmlReader {
results: Vec::new(),
reader: RdfXmlReader {
reader: self.parse(BufReader::new(read)),
reader_buffer: Vec::default(),
}
}
/// Parses a RDF/XML file from a [`AsyncRead`] implementation.
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxrdfxml::{RdfXmlParser, ParseError};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"<?xml version=\"1.0\"?>
/// <rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" xmlns:schema=\"http://schema.org/\">
/// <rdf:Description rdf:about=\"http://example.com/foo\">
/// <rdf:type rdf:resource=\"http://schema.org/Person\" />
/// <schema:name>Foo</schema:name>
/// </rdf:Description>
/// <schema:Person rdf:about=\"http://example.com/bar\" schema:name=\"Bar\" />
/// </rdf:RDF>";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = RdfXmlParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn parse_from_tokio_async_read<R: AsyncRead + Unpin>(
&self,
read: R,
) -> FromTokioAsyncReadRdfXmlReader<R> {
FromTokioAsyncReadRdfXmlReader {
results: Vec::new(),
reader: self.parse(AsyncBufReader::new(read)),
reader_buffer: Vec::default(),
}
}
fn parse<T>(&self, reader: T) -> RdfXmlReader<T> {
let mut reader = NsReader::from_reader(reader);
reader.expand_empty_elements(true);
RdfXmlReader {
reader,
state: vec![RdfXmlState::Doc {
base_iri: self.base.clone(),
@ -106,8 +157,6 @@ impl RdfXmlParser {
in_literal_depth: 0,
known_rdf_id: HashSet::default(),
is_end: false,
},
reader_buffer: Vec::default(),
}
}
}
@ -178,6 +227,76 @@ impl<R: Read> FromReadRdfXmlReader<R> {
}
}
/// Parses a RDF/XML file from a [`AsyncRead`] implementation. Can be built using [`RdfXmlParser::parse_from_tokio_async_read`].
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxrdfxml::{RdfXmlParser, ParseError};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"<?xml version=\"1.0\"?>
/// <rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" xmlns:schema=\"http://schema.org/\">
/// <rdf:Description rdf:about=\"http://example.com/foo\">
/// <rdf:type rdf:resource=\"http://schema.org/Person\" />
/// <schema:name>Foo</schema:name>
/// </rdf:Description>
/// <schema:Person rdf:about=\"http://example.com/bar\" schema:name=\"Bar\" />
/// </rdf:RDF>";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = RdfXmlParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadRdfXmlReader<R: AsyncRead + Unpin> {
results: Vec<Triple>,
reader: RdfXmlReader<AsyncBufReader<R>>,
reader_buffer: Vec<u8>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadRdfXmlReader<R> {
/// Reads the next triple or returns `None` if the file is finished.
pub async fn next(&mut self) -> Option<Result<Triple, ParseError>> {
loop {
if let Some(triple) = self.results.pop() {
return Some(Ok(triple));
} else if self.reader.is_end {
return None;
}
if let Err(e) = self.parse_step().await {
return Some(Err(e));
}
}
}
/// The current byte position in the input data.
pub fn buffer_position(&self) -> usize {
self.reader.reader.buffer_position()
}
async fn parse_step(&mut self) -> Result<(), ParseError> {
self.reader_buffer.clear();
let event = self
.reader
.reader
.read_event_into_async(&mut self.reader_buffer)
.await?;
self.reader.parse_event(event, &mut self.results)
}
}
const RDF_ABOUT: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#about";
const RDF_ABOUT_EACH: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#aboutEach";
const RDF_ABOUT_EACH_PREFIX: &str = "http://www.w3.org/1999/02/22-rdf-syntax-ns#aboutEachPrefix";
@ -285,7 +404,7 @@ impl RdfXmlState {
}
}
struct RdfXmlReader<R: BufRead> {
struct RdfXmlReader<R> {
reader: NsReader<R>,
state: Vec<RdfXmlState>,
custom_entities: HashMap<String, String>,
@ -294,7 +413,7 @@ struct RdfXmlReader<R: BufRead> {
is_end: bool,
}
impl<R: BufRead> RdfXmlReader<R> {
impl<R> RdfXmlReader<R> {
fn parse_event(&mut self, event: Event, results: &mut Vec<Triple>) -> Result<(), ParseError> {
match event {
Event::Start(event) => self.parse_start_event(&event, results),

@ -5,6 +5,8 @@ use quick_xml::Writer;
use std::io;
use std::io::Write;
use std::sync::Arc;
#[cfg(feature = "async-tokio")]
use tokio::io::AsyncWrite;
/// A [RDF/XML](https://www.w3.org/TR/rdf-syntax-grammar/) serializer.
///
@ -34,7 +36,9 @@ impl RdfXmlSerializer {
Self
}
/// Writes a RdfXml file to a [`Write`] implementation.
/// Writes a RDF/XML file to a [`Write`] implementation.
///
/// This writer does unbuffered writes.
///
/// ```
/// use oxrdf::{NamedNodeRef, TripleRef};
@ -56,7 +60,47 @@ impl RdfXmlSerializer {
pub fn serialize_to_write<W: Write>(&self, write: W) -> ToWriteRdfXmlWriter<W> {
ToWriteRdfXmlWriter {
writer: Writer::new_with_indent(write, b'\t', 1),
inner: InnerRdfXmlWriter {
current_subject: None,
},
}
}
/// Writes a RDF/XML file to a [`AsyncWrite`] implementation.
///
/// This writer does unbuffered writes.
///
/// ```
/// use oxrdf::{NamedNodeRef, TripleRef};
/// use oxrdfxml::RdfXmlSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = RdfXmlSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_triple(TripleRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// )).await?;
/// assert_eq!(
/// b"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\">\n\t<rdf:Description rdf:about=\"http://example.com#me\">\n\t\t<type xmlns=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" rdf:resource=\"http://schema.org/Person\"/>\n\t</rdf:Description>\n</rdf:RDF>",
/// writer.finish().await?.as_slice()
/// );
/// Ok(())
/// }
/// ```
#[allow(clippy::unused_self)]
#[cfg(feature = "async-tokio")]
pub fn serialize_to_tokio_async_write<W: AsyncWrite + Unpin>(
&self,
write: W,
) -> ToTokioAsyncWriteRdfXmlWriter<W> {
ToTokioAsyncWriteRdfXmlWriter {
writer: Writer::new_with_indent(write, b'\t', 1),
inner: InnerRdfXmlWriter {
current_subject: None,
},
}
}
}
@ -81,24 +125,111 @@ impl RdfXmlSerializer {
/// ```
pub struct ToWriteRdfXmlWriter<W: Write> {
writer: Writer<W>,
current_subject: Option<Subject>,
inner: InnerRdfXmlWriter,
}
impl<W: Write> ToWriteRdfXmlWriter<W> {
/// Writes an extra triple.
#[allow(clippy::match_wildcard_for_single_variants, unreachable_patterns)]
pub fn write_triple<'a>(&mut self, t: impl Into<TripleRef<'a>>) -> io::Result<()> {
let mut buffer = Vec::new();
self.inner.write_triple(t, &mut buffer)?;
self.flush_buffer(&mut buffer)
}
/// Ends the write process and returns the underlying [`Write`].
pub fn finish(mut self) -> io::Result<W> {
let mut buffer = Vec::new();
self.inner.finish(&mut buffer);
self.flush_buffer(&mut buffer)?;
Ok(self.writer.into_inner())
}
fn flush_buffer(&mut self, buffer: &mut Vec<Event<'_>>) -> io::Result<()> {
for event in buffer.drain(0..) {
self.writer.write_event(event).map_err(map_err)?;
}
Ok(())
}
}
/// Writes a RDF/XML file to a [`AsyncWrite`] implementation. Can be built using [`RdfXmlSerializer::serialize_to_tokio_async_write`].
///
/// ```
/// use oxrdf::{NamedNodeRef, TripleRef};
/// use oxrdfxml::RdfXmlSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = RdfXmlSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_triple(TripleRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// )).await?;
/// assert_eq!(
/// b"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\">\n\t<rdf:Description rdf:about=\"http://example.com#me\">\n\t\t<type xmlns=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" rdf:resource=\"http://schema.org/Person\"/>\n\t</rdf:Description>\n</rdf:RDF>",
/// writer.finish().await?.as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteRdfXmlWriter<W: AsyncWrite + Unpin> {
writer: Writer<W>,
inner: InnerRdfXmlWriter,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteRdfXmlWriter<W> {
/// Writes an extra triple.
#[allow(clippy::match_wildcard_for_single_variants, unreachable_patterns)]
pub async fn write_triple<'a>(&mut self, t: impl Into<TripleRef<'a>>) -> io::Result<()> {
let mut buffer = Vec::new();
self.inner.write_triple(t, &mut buffer)?;
self.flush_buffer(&mut buffer).await
}
/// Ends the write process and returns the underlying [`Write`].
pub async fn finish(mut self) -> io::Result<W> {
let mut buffer = Vec::new();
self.inner.finish(&mut buffer);
self.flush_buffer(&mut buffer).await?;
Ok(self.writer.into_inner())
}
async fn flush_buffer(&mut self, buffer: &mut Vec<Event<'_>>) -> io::Result<()> {
for event in buffer.drain(0..) {
self.writer
.write_event_async(event)
.await
.map_err(map_err)?;
}
Ok(())
}
}
pub struct InnerRdfXmlWriter {
current_subject: Option<Subject>,
}
impl InnerRdfXmlWriter {
#[allow(clippy::match_wildcard_for_single_variants, unreachable_patterns)]
fn write_triple<'a>(
&mut self,
t: impl Into<TripleRef<'a>>,
output: &mut Vec<Event<'a>>,
) -> io::Result<()> {
if self.current_subject.is_none() {
self.write_start()?;
Self::write_start(output);
}
let triple = t.into();
// We open a new rdf:Description if useful
if self.current_subject.as_ref().map(Subject::as_ref) != Some(triple.subject) {
if self.current_subject.is_some() {
self.writer
.write_event(Event::End(BytesEnd::new("rdf:Description")))
.map_err(map_err)?;
output.push(Event::End(BytesEnd::new("rdf:Description")));
}
let mut description_open = BytesStart::new("rdf:Description");
@ -116,10 +247,9 @@ impl<W: Write> ToWriteRdfXmlWriter<W> {
))
}
}
self.writer
.write_event(Event::Start(description_open))
.map_err(map_err)?;
output.push(Event::Start(description_open));
}
self.current_subject = Some(triple.subject.into_owned());
let (prop_prefix, prop_value) = split_iri(triple.predicate.as_str());
let (prop_qname, prop_xmlns) = if prop_value.is_empty() {
@ -127,25 +257,24 @@ impl<W: Write> ToWriteRdfXmlWriter<W> {
} else {
(prop_value, ("xmlns", prop_prefix))
};
let property_element = self.writer.create_element(prop_qname);
let property_element = property_element.with_attribute(prop_xmlns);
match triple.object {
TermRef::NamedNode(node) => property_element
.with_attribute(("rdf:resource", node.as_str()))
.write_empty(),
TermRef::BlankNode(node) => property_element
.with_attribute(("rdf:nodeID", node.as_str()))
.write_empty(),
let mut property_open = BytesStart::new(prop_qname);
property_open.push_attribute(prop_xmlns);
let content = match triple.object {
TermRef::NamedNode(node) => {
property_open.push_attribute(("rdf:resource", node.as_str()));
None
}
TermRef::BlankNode(node) => {
property_open.push_attribute(("rdf:nodeID", node.as_str()));
None
}
TermRef::Literal(literal) => {
let property_element = if let Some(language) = literal.language() {
property_element.with_attribute(("xml:lang", language))
if let Some(language) = literal.language() {
property_open.push_attribute(("xml:lang", language));
} else if !literal.is_plain() {
property_element.with_attribute(("rdf:datatype", literal.datatype().as_str()))
} else {
property_element
};
property_element.write_text_content(BytesText::new(literal.value()))
property_open.push_attribute(("rdf:datatype", literal.datatype().as_str()));
}
Some(literal.value())
}
_ => {
return Err(io::Error::new(
@ -153,37 +282,31 @@ impl<W: Write> ToWriteRdfXmlWriter<W> {
"RDF/XML only supports named, blank or literal object",
))
}
};
if let Some(content) = content {
output.push(Event::Start(property_open));
output.push(Event::Text(BytesText::new(content)));
output.push(Event::End(BytesEnd::new(prop_qname)));
} else {
output.push(Event::Empty(property_open));
}
.map_err(map_err)?;
self.current_subject = Some(triple.subject.into_owned());
Ok(())
}
pub fn write_start(&mut self) -> io::Result<()> {
// We open the file
self.writer
.write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)))
.map_err(map_err)?;
fn write_start(output: &mut Vec<Event<'_>>) {
output.push(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)));
let mut rdf_open = BytesStart::new("rdf:RDF");
rdf_open.push_attribute(("xmlns:rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#"));
self.writer
.write_event(Event::Start(rdf_open))
.map_err(map_err)
output.push(Event::Start(rdf_open))
}
/// Ends the write process and returns the underlying [`Write`].
pub fn finish(mut self) -> io::Result<W> {
fn finish(&self, output: &mut Vec<Event<'static>>) {
if self.current_subject.is_some() {
self.writer
.write_event(Event::End(BytesEnd::new("rdf:Description")))
.map_err(map_err)?;
output.push(Event::End(BytesEnd::new("rdf:Description")));
} else {
self.write_start()?;
Self::write_start(output);
}
self.writer
.write_event(Event::End(BytesEnd::new("rdf:RDF")))
.map_err(map_err)?;
Ok(self.writer.into_inner())
output.push(Event::End(BytesEnd::new("rdf:RDF")));
}
}

@ -16,12 +16,17 @@ rust-version = "1.65"
[features]
default = []
rdf-star = ["oxrdf/rdf-star"]
async-tokio = ["dep:tokio"]
[dependencies]
memchr = "2"
oxrdf = { version = "0.2.0-alpha.1-dev", path = "../oxrdf" }
oxiri = "0.2"
oxilangtag = "0.1"
tokio = { version = "1", optional = true, features = ["io-util"] }
[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
[package.metadata.docs.rs]
all-features = true

@ -1,6 +1,8 @@
//! A [N3](https://w3c.github.io/N3/spec/) streaming parser implemented by [`N3Parser`].
use crate::lexer::{resolve_local_name, N3Lexer, N3LexerMode, N3LexerOptions, N3Token};
#[cfg(feature = "async-tokio")]
use crate::toolkit::FromTokioAsyncReadIterator;
use crate::toolkit::{
FromReadIterator, Lexer, Parser, RuleRecognizer, RuleRecognizerError, SyntaxError,
};
@ -16,6 +18,8 @@ use oxrdf::{
use std::collections::HashMap;
use std::fmt;
use std::io::Read;
#[cfg(feature = "async-tokio")]
use tokio::io::AsyncRead;
/// A N3 term i.e. a RDF `Term` or a `Variable`.
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
@ -261,6 +265,47 @@ impl N3Parser {
}
}
/// Parses a N3 file from a [`AsyncRead`] implementation.
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNode, vocab::rdf};
/// use oxttl::n3::{N3Parser, N3Term};
/// use oxttl::ParseError;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"@base <http://example.com/> .
/// @prefix schema: <http://schema.org/> .
/// <foo> a schema:Person ;
/// schema:name \"Foo\" .
/// <bar> a schema:Person ;
/// schema:name \"Bar\" .";
///
/// let rdf_type = N3Term::NamedNode(rdf::TYPE.into_owned());
/// let schema_person = N3Term::NamedNode(NamedNode::new_unchecked("http://schema.org/Person"));
/// let mut count = 0;
/// let mut parser = N3Parser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf_type && triple.object == schema_person {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn parse_from_tokio_async_read<R: AsyncRead + Unpin>(
&self,
read: R,
) -> FromTokioAsyncReadN3Reader<R> {
FromTokioAsyncReadN3Reader {
inner: self.parse().parser.parse_from_tokio_async_read(read),
}
}
/// Allows to parse a N3 file by using a low-level API.
///
/// Count the number of people:
@ -343,6 +388,50 @@ impl<R: Read> Iterator for FromReadN3Reader<R> {
}
}
/// Parses a N3 file from a [`AsyncRead`] implementation. Can be built using [`N3Parser::parse_from_tokio_async_read`].
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNode, vocab::rdf};
/// use oxttl::n3::{N3Parser, N3Term};
/// use oxttl::ParseError;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"@base <http://example.com/> .
/// @prefix schema: <http://schema.org/> .
/// <foo> a schema:Person ;
/// schema:name \"Foo\" .
/// <bar> a schema:Person ;
/// schema:name \"Bar\" .";
///
/// let rdf_type = N3Term::NamedNode(rdf::TYPE.into_owned());
/// let schema_person = N3Term::NamedNode(NamedNode::new_unchecked("http://schema.org/Person"));
/// let mut count = 0;
/// let mut parser = N3Parser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf_type && triple.object == schema_person {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadN3Reader<R: AsyncRead + Unpin> {
inner: FromTokioAsyncReadIterator<R, N3Recognizer>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadN3Reader<R> {
/// Reads the next triple or returns `None` if the file is finished.
pub async fn next(&mut self) -> Option<Result<N3Quad, ParseError>> {
Some(self.inner.next().await?.map(Into::into))
}
}
/// Parses a N3 file by using a low-level API. Can be built using [`N3Parser::parse`].
///
/// Count the number of people:

@ -1,9 +1,13 @@
//! A [N-Quads](https://www.w3.org/TR/n-quads/) streaming parser implemented by [`NQuadsParser`].
use crate::line_formats::NQuadsRecognizer;
#[cfg(feature = "async-tokio")]
use crate::toolkit::FromTokioAsyncReadIterator;
use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError};
use oxrdf::{Quad, QuadRef};
use std::io::{self, Read, Write};
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
/// A [N-Quads](https://www.w3.org/TR/n-quads/) streaming parser.
///
@ -81,6 +85,43 @@ impl NQuadsParser {
}
}
/// Parses a N-Quads file from a [`AsyncRead`] implementation.
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, NQuadsParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"<http://example.com/foo> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/foo> <http://schema.org/name> \"Foo\" .
/// <http://example.com/bar> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/bar> <http://schema.org/name> \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = NQuadsParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn parse_from_tokio_async_read<R: AsyncRead + Unpin>(
&self,
read: R,
) -> FromTokioAsyncReadNQuadsReader<R> {
FromTokioAsyncReadNQuadsReader {
inner: self.parse().parser.parse_from_tokio_async_read(read),
}
}
/// Allows to parse a N-Quads file by using a low-level API.
///
/// Count the number of people:
@ -164,6 +205,46 @@ impl<R: Read> Iterator for FromReadNQuadsReader<R> {
}
}
/// Parses a N-Quads file from a [`AsyncRead`] implementation. Can be built using [`NQuadsParser::parse_from_tokio_async_read`].
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, NQuadsParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"<http://example.com/foo> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/foo> <http://schema.org/name> \"Foo\" .
/// <http://example.com/bar> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/bar> <http://schema.org/name> \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = NQuadsParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadNQuadsReader<R: AsyncRead + Unpin> {
inner: FromTokioAsyncReadIterator<R, NQuadsRecognizer>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadNQuadsReader<R> {
/// Reads the next triple or returns `None` if the file is finished.
pub async fn next(&mut self) -> Option<Result<Quad, ParseError>> {
Some(self.inner.next().await?.map(Into::into))
}
}
/// Parses a N-Quads file by using a low-level API. Can be built using [`NQuadsParser::parse`].
///
/// Count the number of people:
@ -288,6 +369,41 @@ impl NQuadsSerializer {
}
}
/// Writes a N-Quads file to a [`AsyncWrite`] implementation.
///
/// ```
/// use oxrdf::{NamedNodeRef, QuadRef};
/// use oxttl::NQuadsSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = NQuadsSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_quad(QuadRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// NamedNodeRef::new_unchecked("http://example.com"),
/// )).await?;
/// assert_eq!(
/// b"<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> <http://example.com> .\n",
/// writer.finish().as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn serialize_to_tokio_async_write<W: AsyncWrite + Unpin>(
&self,
write: W,
) -> ToTokioAsyncWriteNQuadsWriter<W> {
ToTokioAsyncWriteNQuadsWriter {
write,
writer: self.serialize(),
buffer: Vec::new(),
}
}
/// Builds a low-level N-Quads writer.
///
/// ```
@ -350,6 +466,52 @@ impl<W: Write> ToWriteNQuadsWriter<W> {
}
}
/// Writes a N-Quads file to a [`AsyncWrite`] implementation. Can be built using [`NQuadsSerializer::serialize_to_tokio_async_write`].
///
/// ```
/// use oxrdf::{NamedNodeRef, QuadRef};
/// use oxttl::NQuadsSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = NQuadsSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_quad(QuadRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// NamedNodeRef::new_unchecked("http://example.com"),
/// )).await?;
/// assert_eq!(
/// b"<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> <http://example.com> .\n",
/// writer.finish().as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteNQuadsWriter<W: AsyncWrite + Unpin> {
write: W,
writer: LowLevelNQuadsWriter,
buffer: Vec<u8>,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteNQuadsWriter<W> {
/// Writes an extra quad.
pub async fn write_quad<'a>(&mut self, q: impl Into<QuadRef<'a>>) -> io::Result<()> {
self.writer.write_quad(q, &mut self.buffer)?;
self.write.write_all(&self.buffer).await?;
self.buffer.clear();
Ok(())
}
/// Ends the write process and returns the underlying [`Write`].
pub fn finish(self) -> W {
self.write
}
}
/// Writes a N-Quads file by using a low-level API. Can be built using [`NQuadsSerializer::serialize`].
///
/// ```

@ -2,9 +2,13 @@
//! and a serializer implemented by [`NTriplesSerializer`].
use crate::line_formats::NQuadsRecognizer;
#[cfg(feature = "async-tokio")]
use crate::toolkit::FromTokioAsyncReadIterator;
use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError};
use oxrdf::{Triple, TripleRef};
use std::io::{self, Read, Write};
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
/// A [N-Triples](https://www.w3.org/TR/n-triples/) streaming parser.
///
@ -82,6 +86,43 @@ impl NTriplesParser {
}
}
/// Parses a N-Triples file from a [`AsyncRead`] implementation.
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, NTriplesParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"<http://example.com/foo> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/foo> <http://schema.org/name> \"Foo\" .
/// <http://example.com/bar> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/bar> <http://schema.org/name> \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = NTriplesParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn parse_from_tokio_async_read<R: AsyncRead + Unpin>(
&self,
read: R,
) -> FromTokioAsyncReadNTriplesReader<R> {
FromTokioAsyncReadNTriplesReader {
inner: self.parse().parser.parse_from_tokio_async_read(read),
}
}
/// Allows to parse a N-Triples file by using a low-level API.
///
/// Count the number of people:
@ -165,6 +206,46 @@ impl<R: Read> Iterator for FromReadNTriplesReader<R> {
}
}
/// Parses a N-Triples file from a [`AsyncRead`] implementation. Can be built using [`NTriplesParser::parse_from_tokio_async_read`].
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, NTriplesParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"<http://example.com/foo> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/foo> <http://schema.org/name> \"Foo\" .
/// <http://example.com/bar> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .
/// <http://example.com/bar> <http://schema.org/name> \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = NTriplesParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadNTriplesReader<R: AsyncRead + Unpin> {
inner: FromTokioAsyncReadIterator<R, NQuadsRecognizer>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadNTriplesReader<R> {
/// Reads the next triple or returns `None` if the file is finished.
pub async fn next(&mut self) -> Option<Result<Triple, ParseError>> {
Some(self.inner.next().await?.map(Into::into))
}
}
/// Parses a N-Triples file by using a low-level API. Can be built using [`NTriplesParser::parse`].
///
/// Count the number of people:
@ -287,6 +368,40 @@ impl NTriplesSerializer {
}
}
/// Writes a N-Triples file to a [`AsyncWrite`] implementation.
///
/// ```
/// use oxrdf::{NamedNodeRef, TripleRef};
/// use oxttl::NTriplesSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = NTriplesSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_triple(TripleRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// )).await?;
/// assert_eq!(
/// b"<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .\n",
/// writer.finish().as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn serialize_to_tokio_async_write<W: AsyncWrite + Unpin>(
&self,
write: W,
) -> ToTokioAsyncWriteNTriplesWriter<W> {
ToTokioAsyncWriteNTriplesWriter {
write,
writer: self.serialize(),
buffer: Vec::new(),
}
}
/// Builds a low-level N-Triples writer.
///
/// ```
@ -347,6 +462,51 @@ impl<W: Write> ToWriteNTriplesWriter<W> {
}
}
/// Writes a N-Triples file to a [`AsyncWrite`] implementation. Can be built using [`NTriplesSerializer::serialize_to_tokio_async_write`].
///
/// ```
/// use oxrdf::{NamedNodeRef, TripleRef};
/// use oxttl::NTriplesSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = NTriplesSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_triple(TripleRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person")
/// )).await?;
/// assert_eq!(
/// b"<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .\n",
/// writer.finish().as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteNTriplesWriter<W: AsyncWrite + Unpin> {
write: W,
writer: LowLevelNTriplesWriter,
buffer: Vec<u8>,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteNTriplesWriter<W> {
/// Writes an extra triple.
pub async fn write_triple<'a>(&mut self, t: impl Into<TripleRef<'a>>) -> io::Result<()> {
self.writer.write_triple(t, &mut self.buffer)?;
self.write.write_all(&self.buffer).await?;
self.buffer.clear();
Ok(())
}
/// Ends the write process and returns the underlying [`Write`].
pub fn finish(self) -> W {
self.write
}
}
/// Writes a N-Triples file by using a low-level API. Can be built using [`NTriplesSerializer::serialize`].
///
/// ```

@ -3,6 +3,8 @@ use std::error::Error;
use std::fmt;
use std::io::{self, Read};
use std::ops::{Range, RangeInclusive};
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, AsyncReadExt};
pub trait TokenRecognizer {
type Token<'a>
@ -122,6 +124,35 @@ impl<R: TokenRecognizer> Lexer<R> {
Ok(())
}
#[cfg(feature = "async-tokio")]
pub async fn extend_from_tokio_async_read(
&mut self,
read: &mut (impl AsyncRead + Unpin),
) -> io::Result<()> {
self.shrink_if_useful();
let min_end = self.end + self.min_buffer_size;
if min_end > self.max_buffer_size {
return Err(io::Error::new(
io::ErrorKind::OutOfMemory,
format!(
"The buffer maximal size is {} < {min_end}",
self.max_buffer_size
),
));
}
if self.data.len() < min_end {
self.data.resize(min_end, 0);
}
if self.data.len() < self.data.capacity() {
// We keep extending to have as much space as available without reallocation
self.data.resize(self.data.capacity(), 0);
}
let read = read.read(&mut self.data[self.end..]).await?;
self.end += read;
self.is_ending = read == 0;
Ok(())
}
pub fn read_next(
&mut self,
options: &R::Options,

@ -6,6 +6,8 @@ mod lexer;
mod parser;
pub use self::lexer::{Lexer, LexerError, TokenRecognizer, TokenRecognizerError};
#[cfg(feature = "async-tokio")]
pub use self::parser::FromTokioAsyncReadIterator;
pub use self::parser::{
FromReadIterator, ParseError, Parser, RuleRecognizer, RuleRecognizerError, SyntaxError,
};

@ -4,6 +4,8 @@ use std::error::Error;
use std::io::Read;
use std::ops::Range;
use std::{fmt, io};
#[cfg(feature = "async-tokio")]
use tokio::io::AsyncRead;
pub trait RuleRecognizer: Sized {
type TokenRecognizer: TokenRecognizer;
@ -114,6 +116,14 @@ impl<RR: RuleRecognizer> Parser<RR> {
pub fn parse_from_read<R: Read>(self, read: R) -> FromReadIterator<R, RR> {
FromReadIterator { read, parser: self }
}
#[cfg(feature = "async-tokio")]
pub fn parse_from_tokio_async_read<R: AsyncRead + Unpin>(
self,
read: R,
) -> FromTokioAsyncReadIterator<R, RR> {
FromTokioAsyncReadIterator { read, parser: self }
}
}
/// An error in the syntax of the parsed file.
@ -258,3 +268,29 @@ impl<R: Read, RR: RuleRecognizer> Iterator for FromReadIterator<R, RR> {
None
}
}
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadIterator<R: AsyncRead + Unpin, RR: RuleRecognizer> {
read: R,
parser: Parser<RR>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin, RR: RuleRecognizer> FromTokioAsyncReadIterator<R, RR> {
pub async fn next(&mut self) -> Option<Result<RR::Output, ParseError>> {
while !self.parser.is_end() {
if let Some(result) = self.parser.read_next() {
return Some(result.map_err(ParseError::Syntax));
}
if let Err(e) = self
.parser
.lexer
.extend_from_tokio_async_read(&mut self.read)
.await
{
return Some(Err(e.into()));
}
}
None
}
}

@ -1,12 +1,16 @@
//! A [TriG](https://www.w3.org/TR/trig/) streaming parser implemented by [`TriGParser`].
use crate::terse::TriGRecognizer;
#[cfg(feature = "async-tokio")]
use crate::toolkit::FromTokioAsyncReadIterator;
use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError};
use oxiri::{Iri, IriParseError};
use oxrdf::{vocab::xsd, GraphName, NamedNode, Quad, QuadRef, Subject, TermRef};
use std::collections::HashMap;
use std::fmt;
use std::io::{self, Read, Write};
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
/// A [TriG](https://www.w3.org/TR/trig/) streaming parser.
///
@ -107,6 +111,45 @@ impl TriGParser {
}
}
/// Parses a TriG file from a [`AsyncRead`] implementation.
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, TriGParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"@base <http://example.com/> .
/// @prefix schema: <http://schema.org/> .
/// <foo> a schema:Person ;
/// schema:name \"Foo\" .
/// <bar> a schema:Person ;
/// schema:name \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = TriGParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn parse_from_tokio_async_read<R: AsyncRead + Unpin>(
&self,
read: R,
) -> FromTokioAsyncReadTriGReader<R> {
FromTokioAsyncReadTriGReader {
inner: self.parse().parser.parse_from_tokio_async_read(read),
}
}
/// Allows to parse a TriG file by using a low-level API.
///
/// Count the number of people:
@ -193,6 +236,48 @@ impl<R: Read> Iterator for FromReadTriGReader<R> {
}
}
/// Parses a TriG file from a [`AsyncRead`] implementation. Can be built using [`TriGParser::parse_from_tokio_async_read`].
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, TriGParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"@base <http://example.com/> .
/// @prefix schema: <http://schema.org/> .
/// <foo> a schema:Person ;
/// schema:name \"Foo\" .
/// <bar> a schema:Person ;
/// schema:name \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = TriGParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadTriGReader<R: AsyncRead + Unpin> {
inner: FromTokioAsyncReadIterator<R, TriGRecognizer>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadTriGReader<R> {
/// Reads the next triple or returns `None` if the file is finished.
pub async fn next(&mut self) -> Option<Result<Quad, ParseError>> {
Some(self.inner.next().await?.map(Into::into))
}
}
/// Parses a TriG file by using a low-level API. Can be built using [`TriGParser::parse`].
///
/// Count the number of people:
@ -317,6 +402,41 @@ impl TriGSerializer {
}
}
/// Writes a TriG file to a [`AsyncWrite`] implementation.
///
/// ```
/// use oxrdf::{NamedNodeRef, QuadRef};
/// use oxttl::TriGSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = TriGSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_quad(QuadRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// NamedNodeRef::new_unchecked("http://example.com"),
/// )).await?;
/// assert_eq!(
/// b"<http://example.com> {\n\t<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .\n}\n",
/// writer.finish().await?.as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn serialize_to_tokio_async_write<W: AsyncWrite + Unpin>(
&self,
write: W,
) -> ToTokioAsyncWriteTriGWriter<W> {
ToTokioAsyncWriteTriGWriter {
write,
writer: self.serialize(),
buffer: Vec::new(),
}
}
/// Builds a low-level TriG writer.
///
/// ```
@ -384,6 +504,55 @@ impl<W: Write> ToWriteTriGWriter<W> {
}
}
/// Writes a TriG file to a [`AsyncWrite`] implementation. Can be built using [`TriGSerializer::serialize_to_tokio_async_write`].
///
/// ```
/// use oxrdf::{NamedNodeRef, QuadRef};
/// use oxttl::TriGSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = TriGSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_quad(QuadRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// NamedNodeRef::new_unchecked("http://example.com"),
/// )).await?;
/// assert_eq!(
/// b"<http://example.com> {\n\t<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .\n}\n",
/// writer.finish().await?.as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteTriGWriter<W: AsyncWrite + Unpin> {
write: W,
writer: LowLevelTriGWriter,
buffer: Vec<u8>,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteTriGWriter<W> {
/// Writes an extra quad.
pub async fn write_quad<'a>(&mut self, q: impl Into<QuadRef<'a>>) -> io::Result<()> {
self.writer.write_quad(q, &mut self.buffer)?;
self.write.write_all(&self.buffer).await?;
self.buffer.clear();
Ok(())
}
/// Ends the write process and returns the underlying [`Write`].
pub async fn finish(mut self) -> io::Result<W> {
self.writer.finish(&mut self.buffer)?;
self.write.write_all(&self.buffer).await?;
self.buffer.clear();
Ok(self.write)
}
}
/// Writes a TriG file by using a low-level API. Can be built using [`TriGSerializer::serialize`].
///
/// ```

@ -1,13 +1,18 @@
//! A [Turtle](https://www.w3.org/TR/turtle/) streaming parser implemented by [`TurtleParser`].
use crate::terse::TriGRecognizer;
#[cfg(feature = "async-tokio")]
use crate::toolkit::FromTokioAsyncReadIterator;
use crate::toolkit::{FromReadIterator, ParseError, Parser, SyntaxError};
use crate::trig::{LowLevelTriGWriter, ToWriteTriGWriter};
use crate::TriGSerializer;
#[cfg(feature = "async-tokio")]
use crate::trig::ToTokioAsyncWriteTriGWriter;
use crate::trig::{LowLevelTriGWriter, ToWriteTriGWriter, TriGSerializer};
use oxiri::{Iri, IriParseError};
use oxrdf::{GraphNameRef, Triple, TripleRef};
use std::collections::HashMap;
use std::io::{self, Read, Write};
#[cfg(feature = "async-tokio")]
use tokio::io::{AsyncRead, AsyncWrite};
/// A [Turtle](https://www.w3.org/TR/turtle/) streaming parser.
///
@ -108,6 +113,45 @@ impl TurtleParser {
}
}
/// Parses a Turtle file from a [`AsyncRead`] implementation.
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, TurtleParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"@base <http://example.com/> .
/// @prefix schema: <http://schema.org/> .
/// <foo> a schema:Person ;
/// schema:name \"Foo\" .
/// <bar> a schema:Person ;
/// schema:name \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = TurtleParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn parse_from_tokio_async_read<R: AsyncRead + Unpin>(
&self,
read: R,
) -> FromTokioAsyncReadTurtleReader<R> {
FromTokioAsyncReadTurtleReader {
inner: self.parse().parser.parse_from_tokio_async_read(read),
}
}
/// Allows to parse a Turtle file by using a low-level API.
///
/// Count the number of people:
@ -194,6 +238,48 @@ impl<R: Read> Iterator for FromReadTurtleReader<R> {
}
}
/// Parses a Turtle file from a [`AsyncRead`] implementation. Can be built using [`TurtleParser::parse_from_tokio_async_read`].
///
/// Count the number of people:
/// ```
/// use oxrdf::{NamedNodeRef, vocab::rdf};
/// use oxttl::{ParseError, TurtleParser};
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<(), ParseError> {
/// let file = b"@base <http://example.com/> .
/// @prefix schema: <http://schema.org/> .
/// <foo> a schema:Person ;
/// schema:name \"Foo\" .
/// <bar> a schema:Person ;
/// schema:name \"Bar\" .";
///
/// let schema_person = NamedNodeRef::new_unchecked("http://schema.org/Person");
/// let mut count = 0;
/// let mut parser = TurtleParser::new().parse_from_tokio_async_read(file.as_ref());
/// while let Some(triple) = parser.next().await {
/// let triple = triple?;
/// if triple.predicate == rdf::TYPE && triple.object == schema_person.into() {
/// count += 1;
/// }
/// }
/// assert_eq!(2, count);
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct FromTokioAsyncReadTurtleReader<R: AsyncRead + Unpin> {
inner: FromTokioAsyncReadIterator<R, TriGRecognizer>,
}
#[cfg(feature = "async-tokio")]
impl<R: AsyncRead + Unpin> FromTokioAsyncReadTurtleReader<R> {
/// Reads the next triple or returns `None` if the file is finished.
pub async fn next(&mut self) -> Option<Result<Triple, ParseError>> {
Some(self.inner.next().await?.map(Into::into))
}
}
/// Parses a Turtle file by using a low-level API. Can be built using [`TurtleParser::parse`].
///
/// Count the number of people:
@ -317,6 +403,38 @@ impl TurtleSerializer {
}
}
/// Writes a Turtle file to a [`AsyncWrite`] implementation.
///
/// ```
/// use oxrdf::{NamedNodeRef, TripleRef};
/// use oxttl::TurtleSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = TurtleSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_triple(TripleRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person"),
/// )).await?;
/// assert_eq!(
/// b"<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .\n",
/// writer.finish().await?.as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub fn serialize_to_tokio_async_write<W: AsyncWrite + Unpin>(
&self,
write: W,
) -> ToTokioAsyncWriteTurtleWriter<W> {
ToTokioAsyncWriteTurtleWriter {
inner: self.inner.serialize_to_tokio_async_write(write),
}
}
/// Builds a low-level Turtle writer.
///
/// ```
@ -379,6 +497,48 @@ impl<W: Write> ToWriteTurtleWriter<W> {
}
}
/// Writes a Turtle file to a [`AsyncWrite`] implementation. Can be built using [`TurtleSerializer::serialize_to_tokio_async_write`].
///
/// ```
/// use oxrdf::{NamedNodeRef, TripleRef};
/// use oxttl::TurtleSerializer;
/// use std::io::Result;
///
/// #[tokio::main(flavor = "current_thread")]
/// async fn main() -> Result<()> {
/// let mut writer = TurtleSerializer::new().serialize_to_tokio_async_write(Vec::new());
/// writer.write_triple(TripleRef::new(
/// NamedNodeRef::new_unchecked("http://example.com#me"),
/// NamedNodeRef::new_unchecked("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
/// NamedNodeRef::new_unchecked("http://schema.org/Person")
/// )).await?;
/// assert_eq!(
/// b"<http://example.com#me> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Person> .\n",
/// writer.finish().await?.as_slice()
/// );
/// Ok(())
/// }
/// ```
#[cfg(feature = "async-tokio")]
pub struct ToTokioAsyncWriteTurtleWriter<W: AsyncWrite + Unpin> {
inner: ToTokioAsyncWriteTriGWriter<W>,
}
#[cfg(feature = "async-tokio")]
impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteTurtleWriter<W> {
/// Writes an extra triple.
pub async fn write_triple<'a>(&mut self, t: impl Into<TripleRef<'a>>) -> io::Result<()> {
self.inner
.write_quad(t.into().in_graph(GraphNameRef::DefaultGraph))
.await
}
/// Ends the write process and returns the underlying [`Write`].
pub async fn finish(self) -> io::Result<W> {
self.inner.finish().await
}
}
/// Writes a Turtle file by using a low-level API. Can be built using [`TurtleSerializer::serialize`].
///
/// ```

Loading…
Cancel
Save