diff --git a/Cargo.lock b/Cargo.lock index 0faf78d1..660c881f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -827,6 +827,7 @@ dependencies = [ "oxigraph", "oxiri", "rand", + "rayon-core", "sparesults", "url", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 6808ab20..1a1801ff 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -20,3 +20,4 @@ rand = "0.8" url = "2" oxiri = "0.2" flate2 = "1" +rayon-core = "1" diff --git a/server/src/main.rs b/server/src/main.rs index d8d94d36..131c50f7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,16 +8,17 @@ use oxigraph::sparql::{Query, QueryResults, Update}; use oxigraph::store::{BulkLoader, Store}; use oxiri::Iri; use rand::random; +use rayon_core::ThreadPoolBuilder; use sparesults::{QueryResultsFormat, QueryResultsSerializer}; use std::cell::RefCell; -use std::cmp::min; +use std::cmp::{max, min}; use std::fmt; use std::fs::File; -use std::io::{self, BufReader, ErrorKind, Read, Write}; +use std::io::{self, BufReader, Error, ErrorKind, Read, Write}; use std::path::PathBuf; use std::rc::Rc; use std::str::FromStr; -use std::thread::{spawn, JoinHandle}; +use std::thread::available_parallelism; use std::time::{Duration, Instant}; use url::form_urlencoded; @@ -70,45 +71,58 @@ pub fn main() -> std::io::Result<()> { match matches.command { Command::Load { file, lenient } => { - let handles = file - .iter() - .map(|file| { - let store = store.clone(); - let file = file.to_string(); - spawn(move || { - let f = file.clone(); - let start = Instant::now(); - let mut loader = store.bulk_loader().on_progress(move |size| { - let elapsed = start.elapsed(); - eprintln!( - "{} triples loaded in {}s ({} t/s) from {}", - size, - elapsed.as_secs(), - ((size as f64) / elapsed.as_secs_f64()).round(), - f - ) - }); - if lenient { - loader = loader.on_parse_error(|e| { - eprintln!("Parsing error: {}", e); - Ok(()) - }) - } - if file.ends_with(".gz") { - bulk_load( - loader, - &file[..file.len() - 3], - MultiGzDecoder::new(File::open(&file)?), - ) - } else { - bulk_load(loader, &file, File::open(&file)?) - } - }) - }) - .collect::>>>(); - for handle in handles { - handle.join().unwrap()?; - } + ThreadPoolBuilder::new() + .num_threads(max(1, available_parallelism()?.get() / 2)) + .thread_name(|i| format!("Oxigraph bulk loader thread {}", i)) + .build() + .map_err(|e| Error::new(ErrorKind::Other, e))? + .scope(|s| { + for file in file { + let store = store.clone(); + let file = file.to_string(); + s.spawn(move |_| { + let f = file.clone(); + let start = Instant::now(); + let mut loader = store.bulk_loader().on_progress(move |size| { + let elapsed = start.elapsed(); + eprintln!( + "{} triples loaded in {}s ({} t/s) from {}", + size, + elapsed.as_secs(), + ((size as f64) / elapsed.as_secs_f64()).round(), + f + ) + }); + if lenient { + let f = file.clone(); + loader = loader.on_parse_error(move |e| { + eprintln!("Parsing error on file {}: {}", f, e); + Ok(()) + }) + } + let fp = match File::open(&file) { + Ok(fp) => fp, + Err(error) => { + eprintln!("Error while opening file {}: {}", file, error); + return; + } + }; + if let Err(error) = { + if file.ends_with(".gz") { + bulk_load( + loader, + &file[..file.len() - 3], + MultiGzDecoder::new(fp), + ) + } else { + bulk_load(loader, &file, fp) + } + } { + eprintln!("Error while loading file {}: {}", file, error) + } + }) + } + }); Ok(()) } Command::Serve { bind } => {