Adds an option to avoid aborting bulk loading on syntax error

pull/192/head
Tpt 3 years ago
parent 296a6340b6
commit 839375c2a4
  1. 75
      lib/src/store.rs
  2. 20
      lib/tests/store.rs
  3. 15
      server/src/main.rs

@ -776,6 +776,7 @@ impl Store {
pub fn bulk_loader(&self) -> BulkLoader { pub fn bulk_loader(&self) -> BulkLoader {
BulkLoader { BulkLoader {
storage: StorageBulkLoader::new(self.storage.clone()), storage: StorageBulkLoader::new(self.storage.clone()),
on_parse_error: None,
} }
} }
@ -1298,6 +1299,7 @@ impl Iterator for GraphNameIter {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub struct BulkLoader { pub struct BulkLoader {
storage: StorageBulkLoader, storage: StorageBulkLoader,
on_parse_error: Option<Box<dyn Fn(ParseError) -> Result<(), ParseError>>>,
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -1309,10 +1311,9 @@ impl BulkLoader {
/// By default this is the number of logical CPU cores provided by the system except if /// By default this is the number of logical CPU cores provided by the system except if
/// [`BulkLoader::set_max_memory_size_in_megabytes`] is set. In this case at least one 1GB is reserved /// [`BulkLoader::set_max_memory_size_in_megabytes`] is set. In this case at least one 1GB is reserved
/// per used thread. /// per used thread.
pub fn set_num_threads(self, num_threads: usize) -> Self { pub fn set_num_threads(mut self, num_threads: usize) -> Self {
Self { self.storage = self.storage.set_num_threads(num_threads);
storage: self.storage.set_num_threads(num_threads), self
}
} }
/// Sets the maximal number of memory used by this operation. /// Sets the maximal number of memory used by this operation.
@ -1324,19 +1325,29 @@ impl BulkLoader {
/// ///
/// By default, at most 1GB per used thread is used /// By default, at most 1GB per used thread is used
/// (i.e. at most GBs at the number of available logical CPU cores in total). /// (i.e. at most GBs at the number of available logical CPU cores in total).
pub fn set_max_memory_size_in_megabytes(self, max_memory_size: usize) -> Self { pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self {
Self { self.storage = self
storage: self .storage
.storage .set_max_memory_size_in_megabytes(max_memory_size);
.set_max_memory_size_in_megabytes(max_memory_size), self
}
} }
/// Adds a `callback` evaluated from time to time with the number of loaded triples. /// Adds a `callback` evaluated from time to time with the number of loaded triples.
pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self { pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self {
Self { self.storage = self.storage.on_progress(callback);
storage: self.storage.on_progress(callback), self
} }
/// Adds a `callback` catching all parse errors and choosing if the parsing should continue
/// by returning `Ok` or fail by returning `Err`.
///
/// By default the parsing fails.
pub fn on_parse_error(
mut self,
callback: impl Fn(ParseError) -> Result<(), ParseError> + 'static,
) -> Self {
self.on_parse_error = Some(Box::new(callback));
self
} }
/// Loads a dataset file using the bulk loader. /// Loads a dataset file using the bulk loader.
@ -1378,7 +1389,21 @@ impl BulkLoader {
.with_base_iri(base_iri) .with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
} }
self.storage.load(parser.read_quads(reader)?) self.storage
.load(parser.read_quads(reader)?.filter_map(|r| match r {
Ok(q) => Some(Ok(q)),
Err(e) => {
if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e))
}
}
}))
} }
/// Loads a graph file using the bulk loader. /// Loads a graph file using the bulk loader.
@ -1422,11 +1447,21 @@ impl BulkLoader {
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
} }
let to_graph_name = to_graph_name.into(); let to_graph_name = to_graph_name.into();
self.storage.load( self.storage
parser .load(parser.read_triples(reader)?.filter_map(|r| match r {
.read_triples(reader)? Ok(q) => Some(Ok(q.in_graph(to_graph_name.into_owned()))),
.map(|r| r.map(|q| q.in_graph(to_graph_name.into_owned()))), Err(e) => {
) if let Some(callback) = &self.on_parse_error {
if let Err(e) = callback(e) {
Some(Err(e))
} else {
None
}
} else {
Some(Err(e))
}
}
}))
} }
/// Adds a set of quads using the bulk loader. /// Adds a set of quads using the bulk loader.

@ -126,6 +126,26 @@ fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
#[test]
fn test_bulk_load_graph_lenient() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store.bulk_loader().on_parse_error(|_| Ok(())).load_graph(
Cursor::new(b"<http://example.com> <http://example.com> <http://example.com##> .\n<http://example.com> <http://example.com> <http://example.com> ."),
GraphFormat::NTriples,
GraphNameRef::DefaultGraph,
None,
)?;
assert_eq!(store.len()?, 1);
assert!(store.contains(QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com"),
NamedNodeRef::new_unchecked("http://example.com"),
NamedNodeRef::new_unchecked("http://example.com"),
GraphNameRef::DefaultGraph
))?);
store.validate()?;
Ok(())
}
#[test] #[test]
fn test_load_dataset() -> Result<(), Box<dyn Error>> { fn test_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new()?; let store = Store::new()?;

@ -63,6 +63,11 @@ enum Command {
/// If multiple files are provided they are loaded in parallel. /// If multiple files are provided they are loaded in parallel.
#[clap(short, long, global = true)] #[clap(short, long, global = true)]
file: Vec<String>, file: Vec<String>,
/// Attempt to keep loading even if the data file is invalid.
///
/// Only works with N-Triples and N-Quads for now.
#[clap(long, global = true)]
lenient: bool,
}, },
} }
@ -76,7 +81,7 @@ pub fn main() -> std::io::Result<()> {
}?; }?;
match matches.command { match matches.command {
Command::Load { file } => { Command::Load { file, lenient } => {
let handles = file let handles = file
.iter() .iter()
.map(|file| { .map(|file| {
@ -85,7 +90,7 @@ pub fn main() -> std::io::Result<()> {
spawn(move || { spawn(move || {
let f = file.clone(); let f = file.clone();
let start = Instant::now(); let start = Instant::now();
let loader = store.bulk_loader().on_progress(move |size| { let mut loader = store.bulk_loader().on_progress(move |size| {
let elapsed = start.elapsed(); let elapsed = start.elapsed();
println!( println!(
"{} triples loaded in {}s ({} t/s) from {}", "{} triples loaded in {}s ({} t/s) from {}",
@ -95,6 +100,12 @@ pub fn main() -> std::io::Result<()> {
f f
) )
}); });
if lenient {
loader = loader.on_parse_error(|e| {
println!("Parsing error: {}", e);
Ok(())
})
}
if file.ends_with(".gz") { if file.ends_with(".gz") {
bulk_load( bulk_load(
loader, loader,

Loading…
Cancel
Save