Python: adds Store.bulk_extend

pull/428/head
Tpt 2 years ago committed by Thomas Tanon
parent 0e00e8209a
commit fbcbd60c0e
  1. 4
      lib/src/storage/mod.rs
  2. 67
      lib/src/store.rs
  3. 53
      python/src/store.rs
  4. 19
      python/tests/test_store.py

@ -1237,9 +1237,9 @@ impl StorageBulkLoader {
}
#[allow(clippy::trait_duplication_in_bounds)]
pub fn load<EI, EO: From<StorageError> + From<EI>, I: IntoIterator<Item = Result<Quad, EI>>>(
pub fn load<EI, EO: From<StorageError> + From<EI>>(
&self,
quads: I,
quads: impl IntoIterator<Item = Result<Quad, EI>>,
) -> Result<(), EO> {
let system = System::new_all();
let cpu_count = min(4, system.physical_core_count().unwrap_or(2));

@ -1441,21 +1441,20 @@ impl BulkLoader {
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
self.storage
.load(parser.read_quads(reader)?.filter_map(|r| match r {
Ok(q) => Some(Ok(q)),
Err(e) => {
if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
self.load_ok_quads(parser.read_quads(reader)?.filter_map(|r| match r {
Ok(q) => Some(Ok(q)),
Err(e) => {
if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e))
}
}))
}
}))
}
/// Loads a graph file using the bulk loader.
@ -1499,21 +1498,20 @@ impl BulkLoader {
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
let to_graph_name = to_graph_name.into();
self.storage
.load(parser.read_triples(reader)?.filter_map(|r| match r {
Ok(q) => Some(Ok(q.in_graph(to_graph_name.into_owned()))),
Err(e) => {
if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
self.load_ok_quads(parser.read_triples(reader)?.filter_map(|r| match r {
Ok(q) => Some(Ok(q.in_graph(to_graph_name.into_owned()))),
Err(e) => {
if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e))
}
}))
}
}))
}
/// Adds a set of quads using the bulk loader.
@ -1523,9 +1521,26 @@ impl BulkLoader {
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details.
pub fn load_quads(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
pub fn load_quads(
&self,
quads: impl IntoIterator<Item = impl Into<Quad>>,
) -> Result<(), StorageError> {
self.load_ok_quads(quads.into_iter().map(Ok::<_, StorageError>))
}
/// Adds a set of quads using the bulk loader while breaking in the middle of the process in case of error.
///
/// Warning: This method is not atomic.
/// If the process fails in the middle of the file, only a part of the data may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details.
pub fn load_ok_quads<EI, EO: From<StorageError> + From<EI>>(
&self,
quads: impl IntoIterator<Item = Result<impl Into<Quad>, EI>>,
) -> Result<(), EO> {
self.storage
.load::<StorageError, _, _>(quads.into_iter().map(Ok))
.load(quads.into_iter().map(|q| q.map(|q| q.into())))
}
}

@ -132,8 +132,11 @@ impl PyStore {
/// Adds atomically a set of quads to this store.
///
/// Insertion is done in a transactional manner: either the full operation succeeds or nothing is written to the database.
/// The :py:func:`bulk_extend` method is also available for much faster loading of a large number of quads but without transactional guarantees.
///
/// :param quads: the quads to add.
/// :type quad: iterable(Quad)
/// :type quads: iterable(Quad)
/// :rtype: None
/// :raises IOError: if an I/O error happens during the quad insertion.
///
@ -152,6 +155,29 @@ impl PyStore {
})
}
/// Adds a set of quads to this store.
///
/// This function is designed to be as fast as possible **without** transactional guarantees.
/// Only a part of the data might be written to the store.
///
/// :param quads: the quads to add.
/// :type quads: iterable(Quad)
/// :rtype: None
/// :raises IOError: if an I/O error happens during the quad insertion.
///
/// >>> store = Store()
/// >>> store.bulk_extend([Quad(NamedNode('http://example.com'), NamedNode('http://example.com/p'), Literal('1'), NamedNode('http://example.com/g'))])
/// >>> list(store)
/// [<Quad subject=<NamedNode value=http://example.com> predicate=<NamedNode value=http://example.com/p> object=<Literal value=1 datatype=<NamedNode value=http://www.w3.org/2001/XMLSchema#string>> graph_name=<NamedNode value=http://example.com/g>>]
fn bulk_extend(&self, quads: &PyAny) -> PyResult<()> {
self.inner
.bulk_loader()
.load_ok_quads::<PyErr, PythonOrStorageError>(
quads.iter()?.map(|q| q?.extract::<PyQuad>()),
)?;
Ok(())
}
/// Removes a quad from the store.
///
/// :param quad: the quad to remove.
@ -839,3 +865,28 @@ pub(crate) fn map_serializer_error(error: SerializerError) -> PyErr {
SerializerError::Io(error) => PyIOError::new_err(error.to_string()),
}
}
enum PythonOrStorageError {
Python(PyErr),
Storage(StorageError),
}
impl From<PyErr> for PythonOrStorageError {
fn from(error: PyErr) -> Self {
Self::Python(error)
}
}
impl From<StorageError> for PythonOrStorageError {
fn from(error: StorageError) -> Self {
Self::Storage(error)
}
}
impl From<PythonOrStorageError> for PyErr {
fn from(error: PythonOrStorageError) -> Self {
match error {
PythonOrStorageError::Python(error) => error,
PythonOrStorageError::Storage(error) => map_storage_error(error),
}
}
}

@ -25,7 +25,24 @@ class TestStore(unittest.TestCase):
def test_extend(self) -> None:
store = Store()
store.extend((Quad(foo, bar, baz), Quad(foo, bar, baz, graph), Quad(foo, bar, baz, DefaultGraph())))
store.extend(
(
Quad(foo, bar, baz),
Quad(foo, bar, baz, graph),
Quad(foo, bar, baz, DefaultGraph()),
)
)
self.assertEqual(len(store), 2)
def test_bulk_extend(self) -> None:
store = Store()
store.bulk_extend(
(
Quad(foo, bar, baz),
Quad(foo, bar, baz, graph),
Quad(foo, bar, baz, DefaultGraph()),
)
)
self.assertEqual(len(store), 2)
def test_remove(self) -> None:

Loading…
Cancel
Save