From 839375c2a4244a383c0b158768389b6dfba77d38 Mon Sep 17 00:00:00 2001 From: Tpt Date: Sun, 6 Mar 2022 20:43:41 +0100 Subject: [PATCH] Adds an option to avoid aborting bulk loading on syntax error --- lib/src/store.rs | 75 +++++++++++++++++++++++++++++++++------------- lib/tests/store.rs | 20 +++++++++++++ server/src/main.rs | 15 ++++++++-- 3 files changed, 88 insertions(+), 22 deletions(-) diff --git a/lib/src/store.rs b/lib/src/store.rs index 215479e3..ae1a8591 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -776,6 +776,7 @@ impl Store { pub fn bulk_loader(&self) -> BulkLoader { BulkLoader { storage: StorageBulkLoader::new(self.storage.clone()), + on_parse_error: None, } } @@ -1298,6 +1299,7 @@ impl Iterator for GraphNameIter { #[cfg(not(target_arch = "wasm32"))] pub struct BulkLoader { storage: StorageBulkLoader, + on_parse_error: Option Result<(), ParseError>>>, } #[cfg(not(target_arch = "wasm32"))] @@ -1309,10 +1311,9 @@ impl BulkLoader { /// By default this is the number of logical CPU cores provided by the system except if /// [`BulkLoader::set_max_memory_size_in_megabytes`] is set. In this case at least one 1GB is reserved /// per used thread. - pub fn set_num_threads(self, num_threads: usize) -> Self { - Self { - storage: self.storage.set_num_threads(num_threads), - } + pub fn set_num_threads(mut self, num_threads: usize) -> Self { + self.storage = self.storage.set_num_threads(num_threads); + self } /// Sets the maximal number of memory used by this operation. @@ -1324,19 +1325,29 @@ impl BulkLoader { /// /// By default, at most 1GB per used thread is used /// (i.e. at most GBs at the number of available logical CPU cores in total). - pub fn set_max_memory_size_in_megabytes(self, max_memory_size: usize) -> Self { - Self { - storage: self - .storage - .set_max_memory_size_in_megabytes(max_memory_size), - } + pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self { + self.storage = self + .storage + .set_max_memory_size_in_megabytes(max_memory_size); + self } /// Adds a `callback` evaluated from time to time with the number of loaded triples. - pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self { - Self { - storage: self.storage.on_progress(callback), - } + pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self { + self.storage = self.storage.on_progress(callback); + self + } + + /// Adds a `callback` catching all parse errors and choosing if the parsing should continue + /// by returning `Ok` or fail by returning `Err`. + /// + /// By default the parsing fails. + pub fn on_parse_error( + mut self, + callback: impl Fn(ParseError) -> Result<(), ParseError> + 'static, + ) -> Self { + self.on_parse_error = Some(Box::new(callback)); + self } /// Loads a dataset file using the bulk loader. @@ -1378,7 +1389,21 @@ impl BulkLoader { .with_base_iri(base_iri) .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; } - self.storage.load(parser.read_quads(reader)?) + self.storage + .load(parser.read_quads(reader)?.filter_map(|r| match r { + Ok(q) => Some(Ok(q)), + Err(e) => { + if let Some(callback) = &self.on_parse_error { + if let Err(e) = callback(e) { + Some(Err(e)) + } else { + None + } + } else { + Some(Err(e)) + } + } + })) } /// Loads a graph file using the bulk loader. @@ -1422,11 +1447,21 @@ impl BulkLoader { .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; } let to_graph_name = to_graph_name.into(); - self.storage.load( - parser - .read_triples(reader)? - .map(|r| r.map(|q| q.in_graph(to_graph_name.into_owned()))), - ) + self.storage + .load(parser.read_triples(reader)?.filter_map(|r| match r { + Ok(q) => Some(Ok(q.in_graph(to_graph_name.into_owned()))), + Err(e) => { + if let Some(callback) = &self.on_parse_error { + if let Err(e) = callback(e) { + Some(Err(e)) + } else { + None + } + } else { + Some(Err(e)) + } + } + })) } /// Adds a set of quads using the bulk loader. diff --git a/lib/tests/store.rs b/lib/tests/store.rs index f34d0ac0..693a22c2 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -126,6 +126,26 @@ fn test_bulk_load_graph() -> Result<(), Box> { Ok(()) } +#[test] +fn test_bulk_load_graph_lenient() -> Result<(), Box> { + let store = Store::new()?; + store.bulk_loader().on_parse_error(|_| Ok(())).load_graph( + Cursor::new(b" .\n ."), + GraphFormat::NTriples, + GraphNameRef::DefaultGraph, + None, + )?; + assert_eq!(store.len()?, 1); + assert!(store.contains(QuadRef::new( + NamedNodeRef::new_unchecked("http://example.com"), + NamedNodeRef::new_unchecked("http://example.com"), + NamedNodeRef::new_unchecked("http://example.com"), + GraphNameRef::DefaultGraph + ))?); + store.validate()?; + Ok(()) +} + #[test] fn test_load_dataset() -> Result<(), Box> { let store = Store::new()?; diff --git a/server/src/main.rs b/server/src/main.rs index bdcf6a12..1d6f324b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -63,6 +63,11 @@ enum Command { /// If multiple files are provided they are loaded in parallel. #[clap(short, long, global = true)] file: Vec, + /// Attempt to keep loading even if the data file is invalid. + /// + /// Only works with N-Triples and N-Quads for now. + #[clap(long, global = true)] + lenient: bool, }, } @@ -76,7 +81,7 @@ pub fn main() -> std::io::Result<()> { }?; match matches.command { - Command::Load { file } => { + Command::Load { file, lenient } => { let handles = file .iter() .map(|file| { @@ -85,7 +90,7 @@ pub fn main() -> std::io::Result<()> { spawn(move || { let f = file.clone(); let start = Instant::now(); - let loader = store.bulk_loader().on_progress(move |size| { + let mut loader = store.bulk_loader().on_progress(move |size| { let elapsed = start.elapsed(); println!( "{} triples loaded in {}s ({} t/s) from {}", @@ -95,6 +100,12 @@ pub fn main() -> std::io::Result<()> { f ) }); + if lenient { + loader = loader.on_parse_error(|e| { + println!("Parsing error: {}", e); + Ok(()) + }) + } if file.ends_with(".gz") { bulk_load( loader,