Python: release GIL on some costly methods

SPARQL query is currently blocked by https://github.com/PyO3/pyo3/issues/2140
pull/190/head
Tpt 3 years ago
parent 9870ce4881
commit 1f6c1db400
  1. 56
      python/src/io.rs
  2. 13
      python/src/sparql.rs
  3. 286
      python/src/store.rs

@ -9,7 +9,7 @@ use pyo3::exceptions::{PyIOError, PySyntaxError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::wrap_pyfunction;
use std::io::{self, BufReader, Read, Write};
use std::io::{self, BufReader, BufWriter, Read, Write};
pub fn add_to_module(module: &PyModule) -> PyResult<()> {
module.add_wrapped(wrap_pyfunction!(parse))?;
@ -61,7 +61,7 @@ pub fn parse(
.map_err(|e| PyValueError::new_err(e.to_string()))?;
}
Ok(PyTripleReader {
inner: parser.read_triples(input).map_err(map_parse_error)?,
inner: py.allow_threads(|| parser.read_triples(input).map_err(map_parse_error))?,
}
.into_py(py))
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
@ -72,7 +72,7 @@ pub fn parse(
.map_err(|e| PyValueError::new_err(e.to_string()))?;
}
Ok(PyQuadReader {
inner: parser.read_quads(input).map_err(map_parse_error)?,
inner: py.allow_threads(|| parser.read_quads(input).map_err(map_parse_error))?,
}
.into_py(py))
} else {
@ -113,7 +113,7 @@ pub fn parse(
#[pyfunction]
#[pyo3(text_signature = "(input, output, /, mime_type, *, base_iri = None)")]
pub fn serialize(input: &PyAny, output: PyObject, mime_type: &str) -> PyResult<()> {
let output = PyFileLike::new(output);
let output = BufWriter::new(PyFileLike::new(output));
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
let mut writer = GraphSerializer::from_format(graph_format)
.triple_writer(output)
@ -155,11 +155,13 @@ impl PyTripleReader {
slf.into()
}
fn __next__(&mut self) -> PyResult<Option<PyTriple>> {
self.inner
.next()
.map(|q| Ok(q.map_err(map_parse_error)?.into()))
.transpose()
fn __next__(&mut self, py: Python<'_>) -> PyResult<Option<PyTriple>> {
py.allow_threads(|| {
self.inner
.next()
.map(|q| Ok(q.map_err(map_parse_error)?.into()))
.transpose()
})
}
}
@ -174,11 +176,13 @@ impl PyQuadReader {
slf.into()
}
fn __next__(&mut self) -> PyResult<Option<PyQuad>> {
self.inner
.next()
.map(|q| Ok(q.map_err(map_parse_error)?.into()))
.transpose()
fn __next__(&mut self, py: Python<'_>) -> PyResult<Option<PyQuad>> {
py.allow_threads(|| {
self.inner
.next()
.map(|q| Ok(q.map_err(map_parse_error)?.into()))
.transpose()
})
}
}
@ -245,3 +249,27 @@ pub(crate) fn map_parse_error(error: ParseError) -> PyErr {
ParseError::Io(error) => map_io_err(error),
}
}
/// Release the GIL
/// There should not be ANY use of pyo3 code inside of this method!!!
///
/// Code from pyo3: https://github.com/PyO3/pyo3/blob/a67180c8a42a0bc0fdc45b651b62c0644130cf47/src/python.rs#L366
#[allow(unsafe_code)]
pub(crate) fn allow_threads_unsafe<T>(f: impl FnOnce() -> T) -> T {
struct RestoreGuard {
tstate: *mut pyo3::ffi::PyThreadState,
}
impl Drop for RestoreGuard {
fn drop(&mut self) {
unsafe {
pyo3::ffi::PyEval_RestoreThread(self.tstate);
}
}
}
let _guard = RestoreGuard {
tstate: unsafe { pyo3::ffi::PyEval_SaveThread() },
};
f()
}

@ -1,4 +1,4 @@
use crate::io::{map_io_err, map_parse_error};
use crate::io::{allow_threads_unsafe, map_io_err, map_parse_error};
use crate::map_storage_error;
use crate::model::*;
use oxigraph::model::Term;
@ -15,7 +15,8 @@ pub fn parse_query(
default_graph: Option<&PyAny>,
named_graphs: Option<&PyAny>,
) -> PyResult<Query> {
let mut query = Query::parse(query, base_iri).map_err(|e| map_evaluation_error(e.into()))?;
let mut query = allow_threads_unsafe(|| Query::parse(query, base_iri))
.map_err(|e| map_evaluation_error(e.into()))?;
if use_default_graph_as_union && default_graph.is_some() {
return Err(PyValueError::new_err(
@ -183,9 +184,7 @@ impl PyQuerySolutions {
}
fn __next__(&mut self) -> PyResult<Option<PyQuerySolution>> {
Ok(self
.inner
.next()
Ok(allow_threads_unsafe(|| self.inner.next())
.transpose()
.map_err(map_evaluation_error)?
.map(move |inner| PyQuerySolution { inner }))
@ -210,9 +209,7 @@ impl PyQueryTriples {
}
fn __next__(&mut self) -> PyResult<Option<PyTriple>> {
Ok(self
.inner
.next()
Ok(allow_threads_unsafe(|| self.inner.next())
.transpose()
.map_err(map_evaluation_error)?
.map(|t| t.into()))

@ -1,16 +1,16 @@
#![allow(clippy::needless_option_as_deref)]
use crate::io::{map_parse_error, PyFileLike};
use crate::io::{allow_threads_unsafe, map_parse_error, PyFileLike};
use crate::model::*;
use crate::sparql::*;
use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::GraphNameRef;
use oxigraph::model::{GraphName, GraphNameRef};
use oxigraph::sparql::Update;
use oxigraph::store::{self, LoaderError, SerializerError, StorageError, Store};
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::{Py, PyRef};
use std::io::BufReader;
use std::io::{BufReader, BufWriter};
/// RDF store.
///
@ -43,14 +43,16 @@ pub struct PyStore {
#[pymethods]
impl PyStore {
#[new]
fn new(path: Option<&str>) -> PyResult<Self> {
Ok(Self {
inner: if let Some(path) = path {
Store::open(path)
} else {
Store::new()
}
.map_err(map_storage_error)?,
fn new(path: Option<&str>, py: Python<'_>) -> PyResult<Self> {
py.allow_threads(|| {
Ok(Self {
inner: if let Some(path) = path {
Store::open(path)
} else {
Store::new()
}
.map_err(map_storage_error)?,
})
})
}
@ -65,9 +67,11 @@ impl PyStore {
/// >>> list(store)
/// [<Quad subject=<NamedNode value=http://example.com> predicate=<NamedNode value=http://example.com/p> object=<Literal value=1 datatype=<NamedNode value=http://www.w3.org/2001/XMLSchema#string>> graph_name=<NamedNode value=http://example.com/g>>]
#[pyo3(text_signature = "($self, quad)")]
fn add(&self, quad: &PyQuad) -> PyResult<()> {
self.inner.insert(quad).map_err(map_storage_error)?;
Ok(())
fn add(&self, quad: &PyQuad, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
self.inner.insert(quad).map_err(map_storage_error)?;
Ok(())
})
}
/// Removes a quad from the store.
@ -83,9 +87,11 @@ impl PyStore {
/// >>> list(store)
/// []
#[pyo3(text_signature = "($self, quad)")]
fn remove(&self, quad: &PyQuad) -> PyResult<()> {
self.inner.remove(quad).map_err(map_storage_error)?;
Ok(())
fn remove(&self, quad: &PyQuad, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
self.inner.remove(quad).map_err(map_storage_error)?;
Ok(())
})
}
/// Looks for the quads matching a given pattern.
@ -190,7 +196,8 @@ impl PyStore {
default_graph,
named_graphs,
)?;
let results = self.inner.query(query).map_err(map_evaluation_error)?;
let results =
allow_threads_unsafe(|| self.inner.query(query)).map_err(map_evaluation_error)?;
query_results_to_python(py, results)
}
@ -229,9 +236,12 @@ impl PyStore {
/// []
#[pyo3(text_signature = "($self, update, *, base_iri)")]
#[args(update, "*", base_iri = "None")]
fn update(&self, update: &str, base_iri: Option<&str>) -> PyResult<()> {
let update = Update::parse(update, base_iri).map_err(|e| map_evaluation_error(e.into()))?;
self.inner.update(update).map_err(map_evaluation_error)
fn update(&self, update: &str, base_iri: Option<&str>, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
let update =
Update::parse(update, base_iri).map_err(|e| map_evaluation_error(e.into()))?;
self.inner.update(update).map_err(map_evaluation_error)
})
}
/// Loads an RDF serialization into the store.
@ -277,37 +287,40 @@ impl PyStore {
mime_type: &str,
base_iri: Option<&str>,
to_graph: Option<&PyAny>,
py: Python<'_>,
) -> PyResult<()> {
let to_graph_name = if let Some(graph_name) = to_graph {
Some(PyGraphNameRef::try_from(graph_name)?)
Some(GraphName::from(&PyGraphNameRef::try_from(graph_name)?))
} else {
None
};
let input = BufReader::new(PyFileLike::new(input));
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
self.inner
.load_graph(
input,
graph_format,
&to_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph),
base_iri,
)
.map_err(map_loader_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if to_graph_name.is_some() {
return Err(PyValueError::new_err(
"The target graph name parameter is not available for dataset formats",
));
py.allow_threads(|| {
let input = BufReader::new(PyFileLike::new(input));
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
self.inner
.load_graph(
input,
graph_format,
to_graph_name.as_ref().unwrap_or(&GraphName::DefaultGraph),
base_iri,
)
.map_err(map_loader_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if to_graph_name.is_some() {
return Err(PyValueError::new_err(
"The target graph name parameter is not available for dataset formats",
));
}
self.inner
.load_dataset(input, dataset_format, base_iri)
.map_err(map_loader_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
mime_type
)))
}
self.inner
.load_dataset(input, dataset_format, base_iri)
.map_err(map_loader_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
mime_type
)))
}
})
}
/// Loads an RDF serialization into the store.
@ -353,37 +366,40 @@ impl PyStore {
mime_type: &str,
base_iri: Option<&str>,
to_graph: Option<&PyAny>,
py: Python<'_>,
) -> PyResult<()> {
let to_graph_name = if let Some(graph_name) = to_graph {
Some(PyGraphNameRef::try_from(graph_name)?)
Some(GraphName::from(&PyGraphNameRef::try_from(graph_name)?))
} else {
None
};
let input = BufReader::new(PyFileLike::new(input));
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
self.inner
.bulk_load_graph(
input,
graph_format,
&to_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph),
base_iri,
)
.map_err(map_loader_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if to_graph_name.is_some() {
return Err(PyValueError::new_err(
"The target graph name parameter is not available for dataset formats",
));
py.allow_threads(|| {
let input = BufReader::new(PyFileLike::new(input));
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
self.inner
.bulk_load_graph(
input,
graph_format,
&to_graph_name.unwrap_or(GraphName::DefaultGraph),
base_iri,
)
.map_err(map_loader_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if to_graph_name.is_some() {
return Err(PyValueError::new_err(
"The target graph name parameter is not available for dataset formats",
));
}
self.inner
.bulk_load_dataset(input, dataset_format, base_iri)
.map_err(map_loader_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
mime_type
)))
}
self.inner
.bulk_load_dataset(input, dataset_format, base_iri)
.map_err(map_loader_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
mime_type
)))
}
})
}
/// Dumps the store quads or triples into a file.
@ -417,36 +433,44 @@ impl PyStore {
/// b'<http://example.com> <http://example.com/p> "1" .\n'
#[pyo3(text_signature = "($self, output, /, mime_type, *, from_graph = None)")]
#[args(output, mime_type, "*", from_graph = "None")]
fn dump(&self, output: PyObject, mime_type: &str, from_graph: Option<&PyAny>) -> PyResult<()> {
fn dump(
&self,
output: PyObject,
mime_type: &str,
from_graph: Option<&PyAny>,
py: Python<'_>,
) -> PyResult<()> {
let from_graph_name = if let Some(graph_name) = from_graph {
Some(PyGraphNameRef::try_from(graph_name)?)
Some(GraphName::from(&PyGraphNameRef::try_from(graph_name)?))
} else {
None
};
let output = PyFileLike::new(output);
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
self.inner
.dump_graph(
output,
graph_format,
&from_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph),
)
.map_err(map_serializer_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if from_graph_name.is_some() {
return Err(PyValueError::new_err(
"The target graph name parameter is not available for dataset formats",
));
py.allow_threads(|| {
let output = BufWriter::new(PyFileLike::new(output));
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
self.inner
.dump_graph(
output,
graph_format,
&from_graph_name.unwrap_or(GraphName::DefaultGraph),
)
.map_err(map_serializer_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if from_graph_name.is_some() {
return Err(PyValueError::new_err(
"The target graph name parameter is not available for dataset formats",
));
}
self.inner
.dump_dataset(output, dataset_format)
.map_err(map_serializer_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
mime_type
)))
}
self.inner
.dump_dataset(output, dataset_format)
.map_err(map_serializer_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
mime_type
)))
}
})
}
/// Returns an iterator over all the store named graphs.
@ -477,19 +501,20 @@ impl PyStore {
/// >>> list(store.named_graphs())
/// [<NamedNode value=http://example.com/g>]
#[pyo3(text_signature = "($self, graph_name)")]
fn add_graph(&self, graph_name: &PyAny) -> PyResult<()> {
match PyGraphNameRef::try_from(graph_name)? {
PyGraphNameRef::DefaultGraph => Ok(()),
PyGraphNameRef::NamedNode(graph_name) => self
.inner
.insert_named_graph(&PyNamedOrBlankNodeRef::NamedNode(graph_name))
.map(|_| ()),
PyGraphNameRef::BlankNode(graph_name) => self
.inner
.insert_named_graph(&PyNamedOrBlankNodeRef::BlankNode(graph_name))
.map(|_| ()),
}
.map_err(map_storage_error)
fn add_graph(&self, graph_name: &PyAny, py: Python<'_>) -> PyResult<()> {
let graph_name = GraphName::from(&PyGraphNameRef::try_from(graph_name)?);
py.allow_threads(|| {
match graph_name {
GraphName::DefaultGraph => Ok(()),
GraphName::NamedNode(graph_name) => {
self.inner.insert_named_graph(&graph_name).map(|_| ())
}
GraphName::BlankNode(graph_name) => {
self.inner.insert_named_graph(&graph_name).map(|_| ())
}
}
.map_err(map_storage_error)
})
}
/// Removes a graph from the store.
@ -506,20 +531,21 @@ impl PyStore {
/// >>> list(store)
/// []
#[pyo3(text_signature = "($self, graph_name)")]
fn remove_graph(&self, graph_name: &PyAny) -> PyResult<()> {
match PyGraphNameRef::try_from(graph_name)? {
PyGraphNameRef::DefaultGraph => self.inner.clear_graph(GraphNameRef::DefaultGraph),
PyGraphNameRef::NamedNode(graph_name) => self
.inner
.remove_named_graph(&PyNamedOrBlankNodeRef::NamedNode(graph_name))
.map(|_| ()),
PyGraphNameRef::BlankNode(graph_name) => self
.inner
.remove_named_graph(&PyNamedOrBlankNodeRef::BlankNode(graph_name))
.map(|_| ()),
}
.map_err(map_storage_error)?;
Ok(())
fn remove_graph(&self, graph_name: &PyAny, py: Python<'_>) -> PyResult<()> {
let graph_name = GraphName::from(&PyGraphNameRef::try_from(graph_name)?);
py.allow_threads(|| {
match graph_name {
GraphName::DefaultGraph => self.inner.clear_graph(GraphNameRef::DefaultGraph),
GraphName::NamedNode(graph_name) => {
self.inner.remove_named_graph(&graph_name).map(|_| ())
}
GraphName::BlankNode(graph_name) => {
self.inner.remove_named_graph(&graph_name).map(|_| ())
}
}
.map_err(map_storage_error)?;
Ok(())
})
}
/// Creates database backup into the `target_directory`.
@ -543,14 +569,16 @@ impl PyStore {
/// :type target_directory: str
/// :raises IOError: if an I/O error happens during the backup.
#[pyo3(text_signature = "($self, target_directory)")]
fn backup(&self, target_directory: &str) -> PyResult<()> {
self.inner
.backup(target_directory)
.map_err(map_storage_error)
fn backup(&self, target_directory: &str, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
self.inner
.backup(target_directory)
.map_err(map_storage_error)
})
}
fn __str__(&self) -> String {
self.inner.to_string()
fn __str__(&self, py: Python<'_>) -> String {
py.allow_threads(|| self.inner.to_string())
}
fn __bool__(&self) -> PyResult<bool> {

Loading…
Cancel
Save