diff --git a/js/src/store.rs b/js/src/store.rs index 2ec83cd6..bffac273 100644 --- a/js/src/store.rs +++ b/js/src/store.rs @@ -181,14 +181,14 @@ impl JsStore { let Some(format) = RdfFormat::from_media_type(mime_type) else { return Err(format_err!("Not supported MIME type: {mime_type}")); }; - let mut buffer = Vec::new(); - if let Some(from_graph_name) = FROM_JS.with(|c| c.to_optional_term(from_graph_name))? { - self.store - .dump_graph(&mut buffer, format, &GraphName::try_from(from_graph_name)?) - } else { - self.store.dump_dataset(&mut buffer, format) - } - .map_err(to_err)?; + let buffer = + if let Some(from_graph_name) = FROM_JS.with(|c| c.to_optional_term(from_graph_name))? { + self.store + .dump_graph(Vec::new(), format, &GraphName::try_from(from_graph_name)?) + } else { + self.store.dump_dataset(Vec::new(), format) + } + .map_err(to_err)?; String::from_utf8(buffer).map_err(to_err) } } diff --git a/lib/oxrdfio/src/serializer.rs b/lib/oxrdfio/src/serializer.rs index 35a9f29e..6ec4e666 100644 --- a/lib/oxrdfio/src/serializer.rs +++ b/lib/oxrdfio/src/serializer.rs @@ -19,7 +19,7 @@ use oxttl::turtle::ToTokioAsyncWriteTurtleWriter; use oxttl::turtle::{ToWriteTurtleWriter, TurtleSerializer}; use std::io::{self, Write}; #[cfg(feature = "async-tokio")] -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::io::AsyncWrite; /// A serializer for RDF serialization formats. /// @@ -214,15 +214,16 @@ impl ToWriteQuadWriter { } /// Writes the last bytes of the file - pub fn finish(self) -> io::Result<()> { - match self.formatter { + /// + /// Note that this function does not flush the writer. You need to do that if you are using a [`BufWriter`](io::BufWriter). + pub fn finish(self) -> io::Result { + Ok(match self.formatter { ToWriteQuadWriterKind::NQuads(writer) => writer.finish(), ToWriteQuadWriterKind::NTriples(writer) => writer.finish(), ToWriteQuadWriterKind::RdfXml(writer) => writer.finish()?, ToWriteQuadWriterKind::TriG(writer) => writer.finish()?, ToWriteQuadWriterKind::Turtle(writer) => writer.finish()?, - } - .flush() + }) } } @@ -296,16 +297,16 @@ impl ToTokioAsyncWriteQuadWriter { } /// Writes the last bytes of the file - pub async fn finish(self) -> io::Result<()> { - match self.formatter { + /// + /// Note that this function does not flush the writer. You need to do that if you are using a [`BufWriter`](io::BufWriter). + pub async fn finish(self) -> io::Result { + Ok(match self.formatter { ToTokioAsyncWriteQuadWriterKind::NQuads(writer) => writer.finish(), ToTokioAsyncWriteQuadWriterKind::NTriples(writer) => writer.finish(), ToTokioAsyncWriteQuadWriterKind::RdfXml(writer) => writer.finish().await?, ToTokioAsyncWriteQuadWriterKind::TriG(writer) => writer.finish().await?, ToTokioAsyncWriteQuadWriterKind::Turtle(writer) => writer.finish().await?, - } - .flush() - .await + }) } } diff --git a/lib/src/io/write.rs b/lib/src/io/write.rs index 7955f3a3..7c308c86 100644 --- a/lib/src/io/write.rs +++ b/lib/src/io/write.rs @@ -86,7 +86,7 @@ impl TripleWriter { /// Writes the last bytes of the file pub fn finish(self) -> io::Result<()> { - self.writer.finish() + self.writer.finish()?.flush() } } @@ -170,6 +170,6 @@ impl QuadWriter { /// Writes the last bytes of the file pub fn finish(self) -> io::Result<()> { - self.writer.finish() + self.writer.finish()?.flush() } } diff --git a/lib/src/store.rs b/lib/src/store.rs index 0667f019..f8ea0af5 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -617,18 +617,17 @@ impl Store { /// assert_eq!(file, buffer.as_slice()); /// # std::io::Result::Ok(()) /// ``` - pub fn dump_graph<'a>( + pub fn dump_graph<'a, W: Write>( &self, - write: impl Write, + write: W, format: impl Into, from_graph_name: impl Into>, - ) -> Result<(), SerializerError> { + ) -> Result { let mut writer = RdfSerializer::from_format(format.into()).serialize_to_write(write); for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) { writer.write_triple(quad?.as_ref())?; } - writer.finish()?; - Ok(()) + Ok(writer.finish()?) } /// Dumps the store into a file. @@ -642,16 +641,15 @@ impl Store { /// let store = Store::new()?; /// store.load_dataset(file, RdfFormat::NQuads, None)?; /// - /// let mut buffer = Vec::new(); - /// store.dump_dataset(&mut buffer, RdfFormat::NQuads)?; + /// let buffer = store.dump_dataset(Vec::new(), RdfFormat::NQuads)?; /// assert_eq!(file, buffer.as_slice()); /// # std::io::Result::Ok(()) /// ``` - pub fn dump_dataset( + pub fn dump_dataset( &self, - write: impl Write, + write: W, format: impl Into, - ) -> Result<(), SerializerError> { + ) -> Result { let format = format.into(); if !format.supports_datasets() { return Err(SerializerError::DatasetFormatExpected(format)); @@ -660,8 +658,7 @@ impl Store { for quad in self.iter() { writer.write_quad(&quad?)?; } - writer.finish()?; - Ok(()) + Ok(writer.finish()?) } /// Returns all the store named graphs. diff --git a/lib/tests/store.rs b/lib/tests/store.rs index 1b28a03d..3deeeaac 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -226,8 +226,7 @@ fn test_dump_dataset() -> Result<(), Box> { store.insert(q)?; } - let mut buffer = Vec::new(); - store.dump_dataset(&mut buffer, RdfFormat::NQuads)?; + let buffer = store.dump_dataset(Vec::new(), RdfFormat::NQuads)?; assert_eq!( buffer.into_iter().filter(|c| *c == b'\n').count(), NUMBER_OF_TRIPLES diff --git a/python/src/io.rs b/python/src/io.rs index 97bee867..9585dcdc 100644 --- a/python/src/io.rs +++ b/python/src/io.rs @@ -128,7 +128,7 @@ pub fn serialize(input: &PyAny, output: PyObject, mime_type: &str, py: Python<'_ } else { PyWritable::from_data(output) }; - let mut writer = RdfSerializer::from_format(format).serialize_to_write(output); + let mut writer = RdfSerializer::from_format(format).serialize_to_write(BufWriter::new(output)); for i in input.iter()? { let i = i?; if let Ok(triple) = i.extract::>() { @@ -145,7 +145,13 @@ pub fn serialize(input: &PyAny, output: PyObject, mime_type: &str, py: Python<'_ } .map_err(map_io_err)?; } - writer.finish().map_err(map_io_err) + writer + .finish() + .map_err(map_io_err)? + .into_inner() + .map_err(|e| map_io_err(e.into_error()))? + .close() + .map_err(map_io_err) } #[pyclass(name = "QuadReader", module = "pyoxigraph")] @@ -202,19 +208,25 @@ impl Read for PyReadable { } pub enum PyWritable { - Io(BufWriter), - File(BufWriter), + Io(PyIo), + File(File), } impl PyWritable { pub fn from_file(file: &Path, py: Python<'_>) -> io::Result { - Ok(Self::File(BufWriter::new( - py.allow_threads(|| File::create(file))?, - ))) + Ok(Self::File(py.allow_threads(|| File::create(file))?)) } pub fn from_data(data: PyObject) -> Self { - Self::Io(BufWriter::new(PyIo(data))) + Self::Io(PyIo(data)) + } + + pub fn close(mut self) -> io::Result<()> { + self.flush()?; + if let Self::File(file) = self { + file.sync_all()?; + } + Ok(()) } } diff --git a/python/src/store.rs b/python/src/store.rs index d689c997..da20686a 100644 --- a/python/src/store.rs +++ b/python/src/store.rs @@ -9,6 +9,7 @@ use oxigraph::sparql::Update; use oxigraph::store::{self, LoaderError, SerializerError, StorageError, Store}; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; +use std::io::BufWriter; use std::path::PathBuf; /// RDF store. @@ -537,12 +538,17 @@ impl PyStore { PyWritable::from_data(output) }; py.allow_threads(|| { + let output = BufWriter::new(output); if let Some(from_graph_name) = &from_graph_name { self.inner.dump_graph(output, format, from_graph_name) } else { self.inner.dump_dataset(output, format) } - .map_err(map_serializer_error) + .map_err(map_serializer_error)? + .into_inner() + .map_err(|e| map_io_err(e.into_error()))? + .close() + .map_err(map_io_err) }) } diff --git a/server/src/main.rs b/server/src/main.rs index 5749d7f6..67a5d527 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -441,17 +441,16 @@ pub fn main() -> anyhow::Result<()> { None }; if let Some(file) = file { - dump( + close_file_writer(dump( &store, - BufWriter::new(File::create(&file).map_err(|e| { - anyhow!("Error while opening file {}: {e}", file.display()) - })?), + BufWriter::new(File::create(file)?), format, graph, - ) + )?)?; } else { - dump(&store, stdout().lock(), format, graph) + dump(&store, stdout().lock(), format, graph)?.flush()?; } + Ok(()) } Command::Query { query, @@ -509,7 +508,7 @@ pub fn main() -> anyhow::Result<()> { for solution in solutions { writer.write(&solution?)?; } - writer.finish()?; + close_file_writer(writer.finish()?)?; } else { let mut writer = QueryResultsSerializer::from_format(format) .solutions_writer( @@ -519,8 +518,7 @@ pub fn main() -> anyhow::Result<()> { for solution in solutions { writer.write(&solution?)?; } - #[allow(clippy::let_underscore_must_use)] - let _ = writer.finish()?; + writer.finish()?.flush()?; } } QueryResults::Boolean(result) => { @@ -542,14 +540,16 @@ pub fn main() -> anyhow::Result<()> { bail!("The --results-format option must be set when writing to stdout") }; if let Some(results_file) = results_file { - QueryResultsSerializer::from_format(format).write_boolean_result( - BufWriter::new(File::create(results_file)?), - result, + close_file_writer( + QueryResultsSerializer::from_format(format).write_boolean_result( + BufWriter::new(File::create(results_file)?), + result, + )?, )?; } else { - #[allow(clippy::let_underscore_must_use)] - let _ = QueryResultsSerializer::from_format(format) - .write_boolean_result(stdout().lock(), result)?; + QueryResultsSerializer::from_format(format) + .write_boolean_result(stdout().lock(), result)? + .flush()?; } } QueryResults::Graph(triples) => { @@ -567,13 +567,13 @@ pub fn main() -> anyhow::Result<()> { for triple in triples { writer.write_triple(triple?.as_ref())?; } - writer.finish()?; + close_file_writer(writer.finish()?)?; } else { let mut writer = serializer.serialize_to_write(stdout().lock()); for triple in triples { writer.write_triple(triple?.as_ref())?; } - writer.finish()?; + writer.finish()?.flush()?; } } } @@ -585,13 +585,14 @@ pub fn main() -> anyhow::Result<()> { .extension() .and_then(OsStr::to_str) { Some("json") => { - explanation.write_in_json(file)?; + explanation.write_in_json(&mut file)?; }, Some("txt") => { write!(file, "{:?}", explanation)?; }, _ => bail!("The given explanation file {} must have an extension that is .json or .txt", explain_file.display()) } + close_file_writer(file)?; } else if explain || stats { eprintln!("{:#?}", explanation); } @@ -648,19 +649,18 @@ fn bulk_load( Ok(()) } -fn dump( +fn dump( store: &Store, - writer: impl Write, + writer: W, format: RdfFormat, from_graph_name: Option>, -) -> anyhow::Result<()> { +) -> anyhow::Result { ensure!(format.supports_datasets() || from_graph_name.is_some(), "The --graph option is required when writing a format not supporting datasets like NTriples, Turtle or RDF/XML"); - if let Some(from_graph_name) = from_graph_name { + Ok(if let Some(from_graph_name) = from_graph_name { store.dump_graph(writer, format, from_graph_name) } else { store.dump_dataset(writer, format) - }?; - Ok(()) + }?) } fn format_from_path( @@ -1631,6 +1631,14 @@ impl Write for ReadForWriteWriter { } } +fn close_file_writer(writer: BufWriter) -> io::Result<()> { + let mut file = writer + .into_inner() + .map_err(io::IntoInnerError::into_error)?; + file.flush()?; + file.sync_all() +} + #[cfg(target_os = "linux")] fn systemd_notify_ready() -> io::Result<()> { if let Some(path) = env::var_os("NOTIFY_SOCKET") {