RDF serialization: removes implicit flush

Allows to the user to control flushing
pull/599/head
Tpt 1 year ago committed by Thomas Tanon
parent 7c227830e9
commit c6e55c706a
  1. 6
      js/src/store.rs
  2. 21
      lib/oxrdfio/src/serializer.rs
  3. 4
      lib/src/io/write.rs
  4. 21
      lib/src/store.rs
  5. 3
      lib/tests/store.rs
  6. 28
      python/src/io.rs
  7. 8
      python/src/store.rs
  8. 50
      server/src/main.rs

@ -181,12 +181,12 @@ impl JsStore {
let Some(format) = RdfFormat::from_media_type(mime_type) else { let Some(format) = RdfFormat::from_media_type(mime_type) else {
return Err(format_err!("Not supported MIME type: {mime_type}")); return Err(format_err!("Not supported MIME type: {mime_type}"));
}; };
let mut buffer = Vec::new(); let buffer =
if let Some(from_graph_name) = FROM_JS.with(|c| c.to_optional_term(from_graph_name))? { if let Some(from_graph_name) = FROM_JS.with(|c| c.to_optional_term(from_graph_name))? {
self.store self.store
.dump_graph(&mut buffer, format, &GraphName::try_from(from_graph_name)?) .dump_graph(Vec::new(), format, &GraphName::try_from(from_graph_name)?)
} else { } else {
self.store.dump_dataset(&mut buffer, format) self.store.dump_dataset(Vec::new(), format)
} }
.map_err(to_err)?; .map_err(to_err)?;
String::from_utf8(buffer).map_err(to_err) String::from_utf8(buffer).map_err(to_err)

@ -19,7 +19,7 @@ use oxttl::turtle::ToTokioAsyncWriteTurtleWriter;
use oxttl::turtle::{ToWriteTurtleWriter, TurtleSerializer}; use oxttl::turtle::{ToWriteTurtleWriter, TurtleSerializer};
use std::io::{self, Write}; use std::io::{self, Write};
#[cfg(feature = "async-tokio")] #[cfg(feature = "async-tokio")]
use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::io::AsyncWrite;
/// A serializer for RDF serialization formats. /// A serializer for RDF serialization formats.
/// ///
@ -214,15 +214,16 @@ impl<W: Write> ToWriteQuadWriter<W> {
} }
/// Writes the last bytes of the file /// Writes the last bytes of the file
pub fn finish(self) -> io::Result<()> { ///
match self.formatter { /// Note that this function does not flush the writer. You need to do that if you are using a [`BufWriter`](io::BufWriter).
pub fn finish(self) -> io::Result<W> {
Ok(match self.formatter {
ToWriteQuadWriterKind::NQuads(writer) => writer.finish(), ToWriteQuadWriterKind::NQuads(writer) => writer.finish(),
ToWriteQuadWriterKind::NTriples(writer) => writer.finish(), ToWriteQuadWriterKind::NTriples(writer) => writer.finish(),
ToWriteQuadWriterKind::RdfXml(writer) => writer.finish()?, ToWriteQuadWriterKind::RdfXml(writer) => writer.finish()?,
ToWriteQuadWriterKind::TriG(writer) => writer.finish()?, ToWriteQuadWriterKind::TriG(writer) => writer.finish()?,
ToWriteQuadWriterKind::Turtle(writer) => writer.finish()?, ToWriteQuadWriterKind::Turtle(writer) => writer.finish()?,
} })
.flush()
} }
} }
@ -296,16 +297,16 @@ impl<W: AsyncWrite + Unpin> ToTokioAsyncWriteQuadWriter<W> {
} }
/// Writes the last bytes of the file /// Writes the last bytes of the file
pub async fn finish(self) -> io::Result<()> { ///
match self.formatter { /// Note that this function does not flush the writer. You need to do that if you are using a [`BufWriter`](io::BufWriter).
pub async fn finish(self) -> io::Result<W> {
Ok(match self.formatter {
ToTokioAsyncWriteQuadWriterKind::NQuads(writer) => writer.finish(), ToTokioAsyncWriteQuadWriterKind::NQuads(writer) => writer.finish(),
ToTokioAsyncWriteQuadWriterKind::NTriples(writer) => writer.finish(), ToTokioAsyncWriteQuadWriterKind::NTriples(writer) => writer.finish(),
ToTokioAsyncWriteQuadWriterKind::RdfXml(writer) => writer.finish().await?, ToTokioAsyncWriteQuadWriterKind::RdfXml(writer) => writer.finish().await?,
ToTokioAsyncWriteQuadWriterKind::TriG(writer) => writer.finish().await?, ToTokioAsyncWriteQuadWriterKind::TriG(writer) => writer.finish().await?,
ToTokioAsyncWriteQuadWriterKind::Turtle(writer) => writer.finish().await?, ToTokioAsyncWriteQuadWriterKind::Turtle(writer) => writer.finish().await?,
} })
.flush()
.await
} }
} }

@ -86,7 +86,7 @@ impl<W: Write> TripleWriter<W> {
/// Writes the last bytes of the file /// Writes the last bytes of the file
pub fn finish(self) -> io::Result<()> { pub fn finish(self) -> io::Result<()> {
self.writer.finish() self.writer.finish()?.flush()
} }
} }
@ -170,6 +170,6 @@ impl<W: Write> QuadWriter<W> {
/// Writes the last bytes of the file /// Writes the last bytes of the file
pub fn finish(self) -> io::Result<()> { pub fn finish(self) -> io::Result<()> {
self.writer.finish() self.writer.finish()?.flush()
} }
} }

@ -617,18 +617,17 @@ impl Store {
/// assert_eq!(file, buffer.as_slice()); /// assert_eq!(file, buffer.as_slice());
/// # std::io::Result::Ok(()) /// # std::io::Result::Ok(())
/// ``` /// ```
pub fn dump_graph<'a>( pub fn dump_graph<'a, W: Write>(
&self, &self,
write: impl Write, write: W,
format: impl Into<RdfFormat>, format: impl Into<RdfFormat>,
from_graph_name: impl Into<GraphNameRef<'a>>, from_graph_name: impl Into<GraphNameRef<'a>>,
) -> Result<(), SerializerError> { ) -> Result<W, SerializerError> {
let mut writer = RdfSerializer::from_format(format.into()).serialize_to_write(write); let mut writer = RdfSerializer::from_format(format.into()).serialize_to_write(write);
for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) { for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) {
writer.write_triple(quad?.as_ref())?; writer.write_triple(quad?.as_ref())?;
} }
writer.finish()?; Ok(writer.finish()?)
Ok(())
} }
/// Dumps the store into a file. /// Dumps the store into a file.
@ -642,16 +641,15 @@ impl Store {
/// let store = Store::new()?; /// let store = Store::new()?;
/// store.load_dataset(file, RdfFormat::NQuads, None)?; /// store.load_dataset(file, RdfFormat::NQuads, None)?;
/// ///
/// let mut buffer = Vec::new(); /// let buffer = store.dump_dataset(Vec::new(), RdfFormat::NQuads)?;
/// store.dump_dataset(&mut buffer, RdfFormat::NQuads)?;
/// assert_eq!(file, buffer.as_slice()); /// assert_eq!(file, buffer.as_slice());
/// # std::io::Result::Ok(()) /// # std::io::Result::Ok(())
/// ``` /// ```
pub fn dump_dataset( pub fn dump_dataset<W: Write>(
&self, &self,
write: impl Write, write: W,
format: impl Into<RdfFormat>, format: impl Into<RdfFormat>,
) -> Result<(), SerializerError> { ) -> Result<W, SerializerError> {
let format = format.into(); let format = format.into();
if !format.supports_datasets() { if !format.supports_datasets() {
return Err(SerializerError::DatasetFormatExpected(format)); return Err(SerializerError::DatasetFormatExpected(format));
@ -660,8 +658,7 @@ impl Store {
for quad in self.iter() { for quad in self.iter() {
writer.write_quad(&quad?)?; writer.write_quad(&quad?)?;
} }
writer.finish()?; Ok(writer.finish()?)
Ok(())
} }
/// Returns all the store named graphs. /// Returns all the store named graphs.

@ -226,8 +226,7 @@ fn test_dump_dataset() -> Result<(), Box<dyn Error>> {
store.insert(q)?; store.insert(q)?;
} }
let mut buffer = Vec::new(); let buffer = store.dump_dataset(Vec::new(), RdfFormat::NQuads)?;
store.dump_dataset(&mut buffer, RdfFormat::NQuads)?;
assert_eq!( assert_eq!(
buffer.into_iter().filter(|c| *c == b'\n').count(), buffer.into_iter().filter(|c| *c == b'\n').count(),
NUMBER_OF_TRIPLES NUMBER_OF_TRIPLES

@ -128,7 +128,7 @@ pub fn serialize(input: &PyAny, output: PyObject, mime_type: &str, py: Python<'_
} else { } else {
PyWritable::from_data(output) PyWritable::from_data(output)
}; };
let mut writer = RdfSerializer::from_format(format).serialize_to_write(output); let mut writer = RdfSerializer::from_format(format).serialize_to_write(BufWriter::new(output));
for i in input.iter()? { for i in input.iter()? {
let i = i?; let i = i?;
if let Ok(triple) = i.extract::<PyRef<PyTriple>>() { if let Ok(triple) = i.extract::<PyRef<PyTriple>>() {
@ -145,7 +145,13 @@ pub fn serialize(input: &PyAny, output: PyObject, mime_type: &str, py: Python<'_
} }
.map_err(map_io_err)?; .map_err(map_io_err)?;
} }
writer.finish().map_err(map_io_err) writer
.finish()
.map_err(map_io_err)?
.into_inner()
.map_err(|e| map_io_err(e.into_error()))?
.close()
.map_err(map_io_err)
} }
#[pyclass(name = "QuadReader", module = "pyoxigraph")] #[pyclass(name = "QuadReader", module = "pyoxigraph")]
@ -202,19 +208,25 @@ impl Read for PyReadable {
} }
pub enum PyWritable { pub enum PyWritable {
Io(BufWriter<PyIo>), Io(PyIo),
File(BufWriter<File>), File(File),
} }
impl PyWritable { impl PyWritable {
pub fn from_file(file: &Path, py: Python<'_>) -> io::Result<Self> { pub fn from_file(file: &Path, py: Python<'_>) -> io::Result<Self> {
Ok(Self::File(BufWriter::new( Ok(Self::File(py.allow_threads(|| File::create(file))?))
py.allow_threads(|| File::create(file))?,
)))
} }
pub fn from_data(data: PyObject) -> Self { pub fn from_data(data: PyObject) -> Self {
Self::Io(BufWriter::new(PyIo(data))) Self::Io(PyIo(data))
}
pub fn close(mut self) -> io::Result<()> {
self.flush()?;
if let Self::File(file) = self {
file.sync_all()?;
}
Ok(())
} }
} }

@ -9,6 +9,7 @@ use oxigraph::sparql::Update;
use oxigraph::store::{self, LoaderError, SerializerError, StorageError, Store}; use oxigraph::store::{self, LoaderError, SerializerError, StorageError, Store};
use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*; use pyo3::prelude::*;
use std::io::BufWriter;
use std::path::PathBuf; use std::path::PathBuf;
/// RDF store. /// RDF store.
@ -537,12 +538,17 @@ impl PyStore {
PyWritable::from_data(output) PyWritable::from_data(output)
}; };
py.allow_threads(|| { py.allow_threads(|| {
let output = BufWriter::new(output);
if let Some(from_graph_name) = &from_graph_name { if let Some(from_graph_name) = &from_graph_name {
self.inner.dump_graph(output, format, from_graph_name) self.inner.dump_graph(output, format, from_graph_name)
} else { } else {
self.inner.dump_dataset(output, format) self.inner.dump_dataset(output, format)
} }
.map_err(map_serializer_error) .map_err(map_serializer_error)?
.into_inner()
.map_err(|e| map_io_err(e.into_error()))?
.close()
.map_err(map_io_err)
}) })
} }

@ -441,17 +441,16 @@ pub fn main() -> anyhow::Result<()> {
None None
}; };
if let Some(file) = file { if let Some(file) = file {
dump( close_file_writer(dump(
&store, &store,
BufWriter::new(File::create(&file).map_err(|e| { BufWriter::new(File::create(file)?),
anyhow!("Error while opening file {}: {e}", file.display())
})?),
format, format,
graph, graph,
) )?)?;
} else { } else {
dump(&store, stdout().lock(), format, graph) dump(&store, stdout().lock(), format, graph)?.flush()?;
} }
Ok(())
} }
Command::Query { Command::Query {
query, query,
@ -509,7 +508,7 @@ pub fn main() -> anyhow::Result<()> {
for solution in solutions { for solution in solutions {
writer.write(&solution?)?; writer.write(&solution?)?;
} }
writer.finish()?; close_file_writer(writer.finish()?)?;
} else { } else {
let mut writer = QueryResultsSerializer::from_format(format) let mut writer = QueryResultsSerializer::from_format(format)
.solutions_writer( .solutions_writer(
@ -519,8 +518,7 @@ pub fn main() -> anyhow::Result<()> {
for solution in solutions { for solution in solutions {
writer.write(&solution?)?; writer.write(&solution?)?;
} }
#[allow(clippy::let_underscore_must_use)] writer.finish()?.flush()?;
let _ = writer.finish()?;
} }
} }
QueryResults::Boolean(result) => { QueryResults::Boolean(result) => {
@ -542,14 +540,16 @@ pub fn main() -> anyhow::Result<()> {
bail!("The --results-format option must be set when writing to stdout") bail!("The --results-format option must be set when writing to stdout")
}; };
if let Some(results_file) = results_file { if let Some(results_file) = results_file {
close_file_writer(
QueryResultsSerializer::from_format(format).write_boolean_result( QueryResultsSerializer::from_format(format).write_boolean_result(
BufWriter::new(File::create(results_file)?), BufWriter::new(File::create(results_file)?),
result, result,
)?,
)?; )?;
} else { } else {
#[allow(clippy::let_underscore_must_use)] QueryResultsSerializer::from_format(format)
let _ = QueryResultsSerializer::from_format(format) .write_boolean_result(stdout().lock(), result)?
.write_boolean_result(stdout().lock(), result)?; .flush()?;
} }
} }
QueryResults::Graph(triples) => { QueryResults::Graph(triples) => {
@ -567,13 +567,13 @@ pub fn main() -> anyhow::Result<()> {
for triple in triples { for triple in triples {
writer.write_triple(triple?.as_ref())?; writer.write_triple(triple?.as_ref())?;
} }
writer.finish()?; close_file_writer(writer.finish()?)?;
} else { } else {
let mut writer = serializer.serialize_to_write(stdout().lock()); let mut writer = serializer.serialize_to_write(stdout().lock());
for triple in triples { for triple in triples {
writer.write_triple(triple?.as_ref())?; writer.write_triple(triple?.as_ref())?;
} }
writer.finish()?; writer.finish()?.flush()?;
} }
} }
} }
@ -585,13 +585,14 @@ pub fn main() -> anyhow::Result<()> {
.extension() .extension()
.and_then(OsStr::to_str) { .and_then(OsStr::to_str) {
Some("json") => { Some("json") => {
explanation.write_in_json(file)?; explanation.write_in_json(&mut file)?;
}, },
Some("txt") => { Some("txt") => {
write!(file, "{:?}", explanation)?; write!(file, "{:?}", explanation)?;
}, },
_ => bail!("The given explanation file {} must have an extension that is .json or .txt", explain_file.display()) _ => bail!("The given explanation file {} must have an extension that is .json or .txt", explain_file.display())
} }
close_file_writer(file)?;
} else if explain || stats { } else if explain || stats {
eprintln!("{:#?}", explanation); eprintln!("{:#?}", explanation);
} }
@ -648,19 +649,18 @@ fn bulk_load(
Ok(()) Ok(())
} }
fn dump( fn dump<W: Write>(
store: &Store, store: &Store,
writer: impl Write, writer: W,
format: RdfFormat, format: RdfFormat,
from_graph_name: Option<GraphNameRef<'_>>, from_graph_name: Option<GraphNameRef<'_>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<W> {
ensure!(format.supports_datasets() || from_graph_name.is_some(), "The --graph option is required when writing a format not supporting datasets like NTriples, Turtle or RDF/XML"); ensure!(format.supports_datasets() || from_graph_name.is_some(), "The --graph option is required when writing a format not supporting datasets like NTriples, Turtle or RDF/XML");
if let Some(from_graph_name) = from_graph_name { Ok(if let Some(from_graph_name) = from_graph_name {
store.dump_graph(writer, format, from_graph_name) store.dump_graph(writer, format, from_graph_name)
} else { } else {
store.dump_dataset(writer, format) store.dump_dataset(writer, format)
}?; }?)
Ok(())
} }
fn format_from_path<T>( fn format_from_path<T>(
@ -1631,6 +1631,14 @@ impl Write for ReadForWriteWriter {
} }
} }
fn close_file_writer(writer: BufWriter<File>) -> io::Result<()> {
let mut file = writer
.into_inner()
.map_err(io::IntoInnerError::into_error)?;
file.flush()?;
file.sync_all()
}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
fn systemd_notify_ready() -> io::Result<()> { fn systemd_notify_ready() -> io::Result<()> {
if let Some(path) = env::var_os("NOTIFY_SOCKET") { if let Some(path) = env::var_os("NOTIFY_SOCKET") {

Loading…
Cancel
Save