From 595ebb17fe360f273e94f8f2c731da944ac4f3cb Mon Sep 17 00:00:00 2001 From: Tpt Date: Mon, 13 Mar 2023 21:23:05 +0100 Subject: [PATCH] Python: adds Store.bulk_extend --- lib/src/storage/mod.rs | 4 +-- lib/src/store.rs | 67 +++++++++++++++++++++++--------------- python/src/store.rs | 53 +++++++++++++++++++++++++++++- python/tests/test_store.py | 19 ++++++++++- 4 files changed, 113 insertions(+), 30 deletions(-) diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 0221660d..97b515c9 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -1237,9 +1237,9 @@ impl StorageBulkLoader { } #[allow(clippy::trait_duplication_in_bounds)] - pub fn load + From, I: IntoIterator>>( + pub fn load + From>( &self, - quads: I, + quads: impl IntoIterator>, ) -> Result<(), EO> { let system = System::new_all(); let cpu_count = min(4, system.physical_core_count().unwrap_or(2)); diff --git a/lib/src/store.rs b/lib/src/store.rs index 859bdd3f..b8e3f1ff 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -1441,21 +1441,20 @@ impl BulkLoader { .with_base_iri(base_iri) .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; } - self.storage - .load(parser.read_quads(reader)?.filter_map(|r| match r { - Ok(q) => Some(Ok(q)), - Err(e) => { - if let Some(callback) = &self.on_parse_error { - if let Err(e) = callback(e) { - Some(Err(e)) - } else { - None - } - } else { + self.load_ok_quads(parser.read_quads(reader)?.filter_map(|r| match r { + Ok(q) => Some(Ok(q)), + Err(e) => { + if let Some(callback) = &self.on_parse_error { + if let Err(e) = callback(e) { Some(Err(e)) + } else { + None } + } else { + Some(Err(e)) } - })) + } + })) } /// Loads a graph file using the bulk loader. @@ -1499,21 +1498,20 @@ impl BulkLoader { .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; } let to_graph_name = to_graph_name.into(); - self.storage - .load(parser.read_triples(reader)?.filter_map(|r| match r { - Ok(q) => Some(Ok(q.in_graph(to_graph_name.into_owned()))), - Err(e) => { - if let Some(callback) = &self.on_parse_error { - if let Err(e) = callback(e) { - Some(Err(e)) - } else { - None - } - } else { + self.load_ok_quads(parser.read_triples(reader)?.filter_map(|r| match r { + Ok(q) => Some(Ok(q.in_graph(to_graph_name.into_owned()))), + Err(e) => { + if let Some(callback) = &self.on_parse_error { + if let Err(e) = callback(e) { Some(Err(e)) + } else { + None } + } else { + Some(Err(e)) } - })) + } + })) } /// Adds a set of quads using the bulk loader. @@ -1523,9 +1521,26 @@ impl BulkLoader { /// Results might get weird if you delete data during the loading process. /// /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details. - pub fn load_quads(&self, quads: impl IntoIterator) -> Result<(), StorageError> { + pub fn load_quads( + &self, + quads: impl IntoIterator>, + ) -> Result<(), StorageError> { + self.load_ok_quads(quads.into_iter().map(Ok::<_, StorageError>)) + } + + /// Adds a set of quads using the bulk loader while breaking in the middle of the process in case of error. + /// + /// Warning: This method is not atomic. + /// If the process fails in the middle of the file, only a part of the data may be written to the store. + /// Results might get weird if you delete data during the loading process. + /// + /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details. + pub fn load_ok_quads + From>( + &self, + quads: impl IntoIterator, EI>>, + ) -> Result<(), EO> { self.storage - .load::(quads.into_iter().map(Ok)) + .load(quads.into_iter().map(|q| q.map(|q| q.into()))) } } diff --git a/python/src/store.rs b/python/src/store.rs index 997f176d..7d3d29a4 100644 --- a/python/src/store.rs +++ b/python/src/store.rs @@ -132,8 +132,11 @@ impl PyStore { /// Adds atomically a set of quads to this store. /// + /// Insertion is done in a transactional manner: either the full operation succeeds or nothing is written to the database. + /// The :py:func:`bulk_extend` method is also available for much faster loading of a large number of quads but without transactional guarantees. + /// /// :param quads: the quads to add. - /// :type quad: iterable(Quad) + /// :type quads: iterable(Quad) /// :rtype: None /// :raises IOError: if an I/O error happens during the quad insertion. /// @@ -152,6 +155,29 @@ impl PyStore { }) } + /// Adds a set of quads to this store. + /// + /// This function is designed to be as fast as possible **without** transactional guarantees. + /// Only a part of the data might be written to the store. + /// + /// :param quads: the quads to add. + /// :type quads: iterable(Quad) + /// :rtype: None + /// :raises IOError: if an I/O error happens during the quad insertion. + /// + /// >>> store = Store() + /// >>> store.bulk_extend([Quad(NamedNode('http://example.com'), NamedNode('http://example.com/p'), Literal('1'), NamedNode('http://example.com/g'))]) + /// >>> list(store) + /// [ predicate= object=> graph_name=>] + fn bulk_extend(&self, quads: &PyAny) -> PyResult<()> { + self.inner + .bulk_loader() + .load_ok_quads::( + quads.iter()?.map(|q| q?.extract::()), + )?; + Ok(()) + } + /// Removes a quad from the store. /// /// :param quad: the quad to remove. @@ -839,3 +865,28 @@ pub(crate) fn map_serializer_error(error: SerializerError) -> PyErr { SerializerError::Io(error) => PyIOError::new_err(error.to_string()), } } + +enum PythonOrStorageError { + Python(PyErr), + Storage(StorageError), +} + +impl From for PythonOrStorageError { + fn from(error: PyErr) -> Self { + Self::Python(error) + } +} + +impl From for PythonOrStorageError { + fn from(error: StorageError) -> Self { + Self::Storage(error) + } +} +impl From for PyErr { + fn from(error: PythonOrStorageError) -> Self { + match error { + PythonOrStorageError::Python(error) => error, + PythonOrStorageError::Storage(error) => map_storage_error(error), + } + } +} diff --git a/python/tests/test_store.py b/python/tests/test_store.py index 9b736bb6..a04ca979 100644 --- a/python/tests/test_store.py +++ b/python/tests/test_store.py @@ -25,7 +25,24 @@ class TestStore(unittest.TestCase): def test_extend(self) -> None: store = Store() - store.extend((Quad(foo, bar, baz), Quad(foo, bar, baz, graph), Quad(foo, bar, baz, DefaultGraph()))) + store.extend( + ( + Quad(foo, bar, baz), + Quad(foo, bar, baz, graph), + Quad(foo, bar, baz, DefaultGraph()), + ) + ) + self.assertEqual(len(store), 2) + + def test_bulk_extend(self) -> None: + store = Store() + store.bulk_extend( + ( + Quad(foo, bar, baz), + Quad(foo, bar, baz, graph), + Quad(foo, bar, baz, DefaultGraph()), + ) + ) self.assertEqual(len(store), 2) def test_remove(self) -> None: