Server: limits file loading concurrency

Bug #210
pull/216/head
Tpt 3 years ago
parent 1bca9436df
commit 06fa80d24e
  1. 1
      Cargo.lock
  2. 1
      server/Cargo.toml
  3. 98
      server/src/main.rs

1
Cargo.lock generated

@ -827,6 +827,7 @@ dependencies = [
"oxigraph", "oxigraph",
"oxiri", "oxiri",
"rand", "rand",
"rayon-core",
"sparesults", "sparesults",
"url", "url",
] ]

@ -20,3 +20,4 @@ rand = "0.8"
url = "2" url = "2"
oxiri = "0.2" oxiri = "0.2"
flate2 = "1" flate2 = "1"
rayon-core = "1"

@ -8,16 +8,17 @@ use oxigraph::sparql::{Query, QueryResults, Update};
use oxigraph::store::{BulkLoader, Store}; use oxigraph::store::{BulkLoader, Store};
use oxiri::Iri; use oxiri::Iri;
use rand::random; use rand::random;
use rayon_core::ThreadPoolBuilder;
use sparesults::{QueryResultsFormat, QueryResultsSerializer}; use sparesults::{QueryResultsFormat, QueryResultsSerializer};
use std::cell::RefCell; use std::cell::RefCell;
use std::cmp::min; use std::cmp::{max, min};
use std::fmt; use std::fmt;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, ErrorKind, Read, Write}; use std::io::{self, BufReader, Error, ErrorKind, Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::rc::Rc; use std::rc::Rc;
use std::str::FromStr; use std::str::FromStr;
use std::thread::{spawn, JoinHandle}; use std::thread::available_parallelism;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use url::form_urlencoded; use url::form_urlencoded;
@ -70,45 +71,58 @@ pub fn main() -> std::io::Result<()> {
match matches.command { match matches.command {
Command::Load { file, lenient } => { Command::Load { file, lenient } => {
let handles = file ThreadPoolBuilder::new()
.iter() .num_threads(max(1, available_parallelism()?.get() / 2))
.map(|file| { .thread_name(|i| format!("Oxigraph bulk loader thread {}", i))
let store = store.clone(); .build()
let file = file.to_string(); .map_err(|e| Error::new(ErrorKind::Other, e))?
spawn(move || { .scope(|s| {
let f = file.clone(); for file in file {
let start = Instant::now(); let store = store.clone();
let mut loader = store.bulk_loader().on_progress(move |size| { let file = file.to_string();
let elapsed = start.elapsed(); s.spawn(move |_| {
eprintln!( let f = file.clone();
"{} triples loaded in {}s ({} t/s) from {}", let start = Instant::now();
size, let mut loader = store.bulk_loader().on_progress(move |size| {
elapsed.as_secs(), let elapsed = start.elapsed();
((size as f64) / elapsed.as_secs_f64()).round(), eprintln!(
f "{} triples loaded in {}s ({} t/s) from {}",
) size,
}); elapsed.as_secs(),
if lenient { ((size as f64) / elapsed.as_secs_f64()).round(),
loader = loader.on_parse_error(|e| { f
eprintln!("Parsing error: {}", e); )
Ok(()) });
}) if lenient {
} let f = file.clone();
if file.ends_with(".gz") { loader = loader.on_parse_error(move |e| {
bulk_load( eprintln!("Parsing error on file {}: {}", f, e);
loader, Ok(())
&file[..file.len() - 3], })
MultiGzDecoder::new(File::open(&file)?), }
) let fp = match File::open(&file) {
} else { Ok(fp) => fp,
bulk_load(loader, &file, File::open(&file)?) Err(error) => {
} eprintln!("Error while opening file {}: {}", file, error);
}) return;
}) }
.collect::<Vec<JoinHandle<io::Result<()>>>>(); };
for handle in handles { if let Err(error) = {
handle.join().unwrap()?; if file.ends_with(".gz") {
} bulk_load(
loader,
&file[..file.len() - 3],
MultiGzDecoder::new(fp),
)
} else {
bulk_load(loader, &file, fp)
}
} {
eprintln!("Error while loading file {}: {}", file, error)
}
})
}
});
Ok(()) Ok(())
} }
Command::Serve { bind } => { Command::Serve { bind } => {

Loading…
Cancel
Save