From 1f6c1db400b124dc983171ac23c605d0fd9a32ea Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 1 Feb 2022 22:32:40 +0100 Subject: [PATCH] Python: release GIL on some costly methods SPARQL query is currently blocked by https://github.com/PyO3/pyo3/issues/2140 --- python/src/io.rs | 56 ++++++--- python/src/sparql.rs | 13 +- python/src/store.rs | 286 ++++++++++++++++++++++++------------------- 3 files changed, 204 insertions(+), 151 deletions(-) diff --git a/python/src/io.rs b/python/src/io.rs index 9f1504e4..3a761caa 100644 --- a/python/src/io.rs +++ b/python/src/io.rs @@ -9,7 +9,7 @@ use pyo3::exceptions::{PyIOError, PySyntaxError, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyBytes; use pyo3::wrap_pyfunction; -use std::io::{self, BufReader, Read, Write}; +use std::io::{self, BufReader, BufWriter, Read, Write}; pub fn add_to_module(module: &PyModule) -> PyResult<()> { module.add_wrapped(wrap_pyfunction!(parse))?; @@ -61,7 +61,7 @@ pub fn parse( .map_err(|e| PyValueError::new_err(e.to_string()))?; } Ok(PyTripleReader { - inner: parser.read_triples(input).map_err(map_parse_error)?, + inner: py.allow_threads(|| parser.read_triples(input).map_err(map_parse_error))?, } .into_py(py)) } else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) { @@ -72,7 +72,7 @@ pub fn parse( .map_err(|e| PyValueError::new_err(e.to_string()))?; } Ok(PyQuadReader { - inner: parser.read_quads(input).map_err(map_parse_error)?, + inner: py.allow_threads(|| parser.read_quads(input).map_err(map_parse_error))?, } .into_py(py)) } else { @@ -113,7 +113,7 @@ pub fn parse( #[pyfunction] #[pyo3(text_signature = "(input, output, /, mime_type, *, base_iri = None)")] pub fn serialize(input: &PyAny, output: PyObject, mime_type: &str) -> PyResult<()> { - let output = PyFileLike::new(output); + let output = BufWriter::new(PyFileLike::new(output)); if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { let mut writer = GraphSerializer::from_format(graph_format) .triple_writer(output) @@ -155,11 +155,13 @@ impl PyTripleReader { slf.into() } - fn __next__(&mut self) -> PyResult> { - self.inner - .next() - .map(|q| Ok(q.map_err(map_parse_error)?.into())) - .transpose() + fn __next__(&mut self, py: Python<'_>) -> PyResult> { + py.allow_threads(|| { + self.inner + .next() + .map(|q| Ok(q.map_err(map_parse_error)?.into())) + .transpose() + }) } } @@ -174,11 +176,13 @@ impl PyQuadReader { slf.into() } - fn __next__(&mut self) -> PyResult> { - self.inner - .next() - .map(|q| Ok(q.map_err(map_parse_error)?.into())) - .transpose() + fn __next__(&mut self, py: Python<'_>) -> PyResult> { + py.allow_threads(|| { + self.inner + .next() + .map(|q| Ok(q.map_err(map_parse_error)?.into())) + .transpose() + }) } } @@ -245,3 +249,27 @@ pub(crate) fn map_parse_error(error: ParseError) -> PyErr { ParseError::Io(error) => map_io_err(error), } } + +/// Release the GIL +/// There should not be ANY use of pyo3 code inside of this method!!! +/// +/// Code from pyo3: https://github.com/PyO3/pyo3/blob/a67180c8a42a0bc0fdc45b651b62c0644130cf47/src/python.rs#L366 +#[allow(unsafe_code)] +pub(crate) fn allow_threads_unsafe(f: impl FnOnce() -> T) -> T { + struct RestoreGuard { + tstate: *mut pyo3::ffi::PyThreadState, + } + + impl Drop for RestoreGuard { + fn drop(&mut self) { + unsafe { + pyo3::ffi::PyEval_RestoreThread(self.tstate); + } + } + } + + let _guard = RestoreGuard { + tstate: unsafe { pyo3::ffi::PyEval_SaveThread() }, + }; + f() +} diff --git a/python/src/sparql.rs b/python/src/sparql.rs index a1a84f23..06344773 100644 --- a/python/src/sparql.rs +++ b/python/src/sparql.rs @@ -1,4 +1,4 @@ -use crate::io::{map_io_err, map_parse_error}; +use crate::io::{allow_threads_unsafe, map_io_err, map_parse_error}; use crate::map_storage_error; use crate::model::*; use oxigraph::model::Term; @@ -15,7 +15,8 @@ pub fn parse_query( default_graph: Option<&PyAny>, named_graphs: Option<&PyAny>, ) -> PyResult { - let mut query = Query::parse(query, base_iri).map_err(|e| map_evaluation_error(e.into()))?; + let mut query = allow_threads_unsafe(|| Query::parse(query, base_iri)) + .map_err(|e| map_evaluation_error(e.into()))?; if use_default_graph_as_union && default_graph.is_some() { return Err(PyValueError::new_err( @@ -183,9 +184,7 @@ impl PyQuerySolutions { } fn __next__(&mut self) -> PyResult> { - Ok(self - .inner - .next() + Ok(allow_threads_unsafe(|| self.inner.next()) .transpose() .map_err(map_evaluation_error)? .map(move |inner| PyQuerySolution { inner })) @@ -210,9 +209,7 @@ impl PyQueryTriples { } fn __next__(&mut self) -> PyResult> { - Ok(self - .inner - .next() + Ok(allow_threads_unsafe(|| self.inner.next()) .transpose() .map_err(map_evaluation_error)? .map(|t| t.into())) diff --git a/python/src/store.rs b/python/src/store.rs index e95a3c6c..f059c479 100644 --- a/python/src/store.rs +++ b/python/src/store.rs @@ -1,16 +1,16 @@ #![allow(clippy::needless_option_as_deref)] -use crate::io::{map_parse_error, PyFileLike}; +use crate::io::{allow_threads_unsafe, map_parse_error, PyFileLike}; use crate::model::*; use crate::sparql::*; use oxigraph::io::{DatasetFormat, GraphFormat}; -use oxigraph::model::GraphNameRef; +use oxigraph::model::{GraphName, GraphNameRef}; use oxigraph::sparql::Update; use oxigraph::store::{self, LoaderError, SerializerError, StorageError, Store}; use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::{Py, PyRef}; -use std::io::BufReader; +use std::io::{BufReader, BufWriter}; /// RDF store. /// @@ -43,14 +43,16 @@ pub struct PyStore { #[pymethods] impl PyStore { #[new] - fn new(path: Option<&str>) -> PyResult { - Ok(Self { - inner: if let Some(path) = path { - Store::open(path) - } else { - Store::new() - } - .map_err(map_storage_error)?, + fn new(path: Option<&str>, py: Python<'_>) -> PyResult { + py.allow_threads(|| { + Ok(Self { + inner: if let Some(path) = path { + Store::open(path) + } else { + Store::new() + } + .map_err(map_storage_error)?, + }) }) } @@ -65,9 +67,11 @@ impl PyStore { /// >>> list(store) /// [ predicate= object=> graph_name=>] #[pyo3(text_signature = "($self, quad)")] - fn add(&self, quad: &PyQuad) -> PyResult<()> { - self.inner.insert(quad).map_err(map_storage_error)?; - Ok(()) + fn add(&self, quad: &PyQuad, py: Python<'_>) -> PyResult<()> { + py.allow_threads(|| { + self.inner.insert(quad).map_err(map_storage_error)?; + Ok(()) + }) } /// Removes a quad from the store. @@ -83,9 +87,11 @@ impl PyStore { /// >>> list(store) /// [] #[pyo3(text_signature = "($self, quad)")] - fn remove(&self, quad: &PyQuad) -> PyResult<()> { - self.inner.remove(quad).map_err(map_storage_error)?; - Ok(()) + fn remove(&self, quad: &PyQuad, py: Python<'_>) -> PyResult<()> { + py.allow_threads(|| { + self.inner.remove(quad).map_err(map_storage_error)?; + Ok(()) + }) } /// Looks for the quads matching a given pattern. @@ -190,7 +196,8 @@ impl PyStore { default_graph, named_graphs, )?; - let results = self.inner.query(query).map_err(map_evaluation_error)?; + let results = + allow_threads_unsafe(|| self.inner.query(query)).map_err(map_evaluation_error)?; query_results_to_python(py, results) } @@ -229,9 +236,12 @@ impl PyStore { /// [] #[pyo3(text_signature = "($self, update, *, base_iri)")] #[args(update, "*", base_iri = "None")] - fn update(&self, update: &str, base_iri: Option<&str>) -> PyResult<()> { - let update = Update::parse(update, base_iri).map_err(|e| map_evaluation_error(e.into()))?; - self.inner.update(update).map_err(map_evaluation_error) + fn update(&self, update: &str, base_iri: Option<&str>, py: Python<'_>) -> PyResult<()> { + py.allow_threads(|| { + let update = + Update::parse(update, base_iri).map_err(|e| map_evaluation_error(e.into()))?; + self.inner.update(update).map_err(map_evaluation_error) + }) } /// Loads an RDF serialization into the store. @@ -277,37 +287,40 @@ impl PyStore { mime_type: &str, base_iri: Option<&str>, to_graph: Option<&PyAny>, + py: Python<'_>, ) -> PyResult<()> { let to_graph_name = if let Some(graph_name) = to_graph { - Some(PyGraphNameRef::try_from(graph_name)?) + Some(GraphName::from(&PyGraphNameRef::try_from(graph_name)?)) } else { None }; - let input = BufReader::new(PyFileLike::new(input)); - if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { - self.inner - .load_graph( - input, - graph_format, - &to_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph), - base_iri, - ) - .map_err(map_loader_error) - } else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) { - if to_graph_name.is_some() { - return Err(PyValueError::new_err( - "The target graph name parameter is not available for dataset formats", - )); + py.allow_threads(|| { + let input = BufReader::new(PyFileLike::new(input)); + if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { + self.inner + .load_graph( + input, + graph_format, + to_graph_name.as_ref().unwrap_or(&GraphName::DefaultGraph), + base_iri, + ) + .map_err(map_loader_error) + } else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) { + if to_graph_name.is_some() { + return Err(PyValueError::new_err( + "The target graph name parameter is not available for dataset formats", + )); + } + self.inner + .load_dataset(input, dataset_format, base_iri) + .map_err(map_loader_error) + } else { + Err(PyValueError::new_err(format!( + "Not supported MIME type: {}", + mime_type + ))) } - self.inner - .load_dataset(input, dataset_format, base_iri) - .map_err(map_loader_error) - } else { - Err(PyValueError::new_err(format!( - "Not supported MIME type: {}", - mime_type - ))) - } + }) } /// Loads an RDF serialization into the store. @@ -353,37 +366,40 @@ impl PyStore { mime_type: &str, base_iri: Option<&str>, to_graph: Option<&PyAny>, + py: Python<'_>, ) -> PyResult<()> { let to_graph_name = if let Some(graph_name) = to_graph { - Some(PyGraphNameRef::try_from(graph_name)?) + Some(GraphName::from(&PyGraphNameRef::try_from(graph_name)?)) } else { None }; - let input = BufReader::new(PyFileLike::new(input)); - if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { - self.inner - .bulk_load_graph( - input, - graph_format, - &to_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph), - base_iri, - ) - .map_err(map_loader_error) - } else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) { - if to_graph_name.is_some() { - return Err(PyValueError::new_err( - "The target graph name parameter is not available for dataset formats", - )); + py.allow_threads(|| { + let input = BufReader::new(PyFileLike::new(input)); + if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { + self.inner + .bulk_load_graph( + input, + graph_format, + &to_graph_name.unwrap_or(GraphName::DefaultGraph), + base_iri, + ) + .map_err(map_loader_error) + } else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) { + if to_graph_name.is_some() { + return Err(PyValueError::new_err( + "The target graph name parameter is not available for dataset formats", + )); + } + self.inner + .bulk_load_dataset(input, dataset_format, base_iri) + .map_err(map_loader_error) + } else { + Err(PyValueError::new_err(format!( + "Not supported MIME type: {}", + mime_type + ))) } - self.inner - .bulk_load_dataset(input, dataset_format, base_iri) - .map_err(map_loader_error) - } else { - Err(PyValueError::new_err(format!( - "Not supported MIME type: {}", - mime_type - ))) - } + }) } /// Dumps the store quads or triples into a file. @@ -417,36 +433,44 @@ impl PyStore { /// b' "1" .\n' #[pyo3(text_signature = "($self, output, /, mime_type, *, from_graph = None)")] #[args(output, mime_type, "*", from_graph = "None")] - fn dump(&self, output: PyObject, mime_type: &str, from_graph: Option<&PyAny>) -> PyResult<()> { + fn dump( + &self, + output: PyObject, + mime_type: &str, + from_graph: Option<&PyAny>, + py: Python<'_>, + ) -> PyResult<()> { let from_graph_name = if let Some(graph_name) = from_graph { - Some(PyGraphNameRef::try_from(graph_name)?) + Some(GraphName::from(&PyGraphNameRef::try_from(graph_name)?)) } else { None }; - let output = PyFileLike::new(output); - if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { - self.inner - .dump_graph( - output, - graph_format, - &from_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph), - ) - .map_err(map_serializer_error) - } else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) { - if from_graph_name.is_some() { - return Err(PyValueError::new_err( - "The target graph name parameter is not available for dataset formats", - )); + py.allow_threads(|| { + let output = BufWriter::new(PyFileLike::new(output)); + if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { + self.inner + .dump_graph( + output, + graph_format, + &from_graph_name.unwrap_or(GraphName::DefaultGraph), + ) + .map_err(map_serializer_error) + } else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) { + if from_graph_name.is_some() { + return Err(PyValueError::new_err( + "The target graph name parameter is not available for dataset formats", + )); + } + self.inner + .dump_dataset(output, dataset_format) + .map_err(map_serializer_error) + } else { + Err(PyValueError::new_err(format!( + "Not supported MIME type: {}", + mime_type + ))) } - self.inner - .dump_dataset(output, dataset_format) - .map_err(map_serializer_error) - } else { - Err(PyValueError::new_err(format!( - "Not supported MIME type: {}", - mime_type - ))) - } + }) } /// Returns an iterator over all the store named graphs. @@ -477,19 +501,20 @@ impl PyStore { /// >>> list(store.named_graphs()) /// [] #[pyo3(text_signature = "($self, graph_name)")] - fn add_graph(&self, graph_name: &PyAny) -> PyResult<()> { - match PyGraphNameRef::try_from(graph_name)? { - PyGraphNameRef::DefaultGraph => Ok(()), - PyGraphNameRef::NamedNode(graph_name) => self - .inner - .insert_named_graph(&PyNamedOrBlankNodeRef::NamedNode(graph_name)) - .map(|_| ()), - PyGraphNameRef::BlankNode(graph_name) => self - .inner - .insert_named_graph(&PyNamedOrBlankNodeRef::BlankNode(graph_name)) - .map(|_| ()), - } - .map_err(map_storage_error) + fn add_graph(&self, graph_name: &PyAny, py: Python<'_>) -> PyResult<()> { + let graph_name = GraphName::from(&PyGraphNameRef::try_from(graph_name)?); + py.allow_threads(|| { + match graph_name { + GraphName::DefaultGraph => Ok(()), + GraphName::NamedNode(graph_name) => { + self.inner.insert_named_graph(&graph_name).map(|_| ()) + } + GraphName::BlankNode(graph_name) => { + self.inner.insert_named_graph(&graph_name).map(|_| ()) + } + } + .map_err(map_storage_error) + }) } /// Removes a graph from the store. @@ -506,20 +531,21 @@ impl PyStore { /// >>> list(store) /// [] #[pyo3(text_signature = "($self, graph_name)")] - fn remove_graph(&self, graph_name: &PyAny) -> PyResult<()> { - match PyGraphNameRef::try_from(graph_name)? { - PyGraphNameRef::DefaultGraph => self.inner.clear_graph(GraphNameRef::DefaultGraph), - PyGraphNameRef::NamedNode(graph_name) => self - .inner - .remove_named_graph(&PyNamedOrBlankNodeRef::NamedNode(graph_name)) - .map(|_| ()), - PyGraphNameRef::BlankNode(graph_name) => self - .inner - .remove_named_graph(&PyNamedOrBlankNodeRef::BlankNode(graph_name)) - .map(|_| ()), - } - .map_err(map_storage_error)?; - Ok(()) + fn remove_graph(&self, graph_name: &PyAny, py: Python<'_>) -> PyResult<()> { + let graph_name = GraphName::from(&PyGraphNameRef::try_from(graph_name)?); + py.allow_threads(|| { + match graph_name { + GraphName::DefaultGraph => self.inner.clear_graph(GraphNameRef::DefaultGraph), + GraphName::NamedNode(graph_name) => { + self.inner.remove_named_graph(&graph_name).map(|_| ()) + } + GraphName::BlankNode(graph_name) => { + self.inner.remove_named_graph(&graph_name).map(|_| ()) + } + } + .map_err(map_storage_error)?; + Ok(()) + }) } /// Creates database backup into the `target_directory`. @@ -543,14 +569,16 @@ impl PyStore { /// :type target_directory: str /// :raises IOError: if an I/O error happens during the backup. #[pyo3(text_signature = "($self, target_directory)")] - fn backup(&self, target_directory: &str) -> PyResult<()> { - self.inner - .backup(target_directory) - .map_err(map_storage_error) + fn backup(&self, target_directory: &str, py: Python<'_>) -> PyResult<()> { + py.allow_threads(|| { + self.inner + .backup(target_directory) + .map_err(map_storage_error) + }) } - fn __str__(&self) -> String { - self.inner.to_string() + fn __str__(&self, py: Python<'_>) -> String { + py.allow_threads(|| self.inner.to_string()) } fn __bool__(&self) -> PyResult {