diff --git a/server/src/main.rs b/server/src/main.rs index 95d7285b..80d2585b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,4 +1,4 @@ -use anyhow::{bail, Context, Error}; +use anyhow::{anyhow, bail, Context, Error}; use clap::{Parser, Subcommand}; use flate2::read::MultiGzDecoder; use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status}; @@ -21,7 +21,7 @@ use std::cmp::{max, min}; use std::ffi::OsStr; use std::fmt; use std::fs::File; -use std::io::{self, BufReader, Read, Write}; +use std::io::{self, stdin, BufRead, BufReader, Read, Write}; use std::path::{Path, PathBuf}; use std::rc::Rc; use std::str; @@ -132,77 +132,109 @@ pub fn main() -> anyhow::Result<()> { } else { None }; - ThreadPoolBuilder::new() - .num_threads(max(1, available_parallelism()?.get() / 2)) - .thread_name(|i| format!("Oxigraph bulk loader thread {i}")) - .build()? - .scope(|s| { - for file in file { - let store = store.clone(); - s.spawn(move |_| { - let f = file.clone(); - let start = Instant::now(); - let mut loader = store.bulk_loader().on_progress(move |size| { - let elapsed = start.elapsed(); - eprintln!( - "{} triples loaded in {}s ({} t/s) from {}", - size, - elapsed.as_secs(), - ((size as f64) / elapsed.as_secs_f64()).round(), - f.display() - ) - }); - if lenient { + if file.is_empty() { + // We read from stdin + let start = Instant::now(); + let mut loader = store.bulk_loader().on_progress(move |size| { + let elapsed = start.elapsed(); + eprintln!( + "{size} triples loaded in {}s ({} t/s)", + elapsed.as_secs(), + ((size as f64) / elapsed.as_secs_f64()).round() + ) + }); + if lenient { + loader = loader.on_parse_error(move |e| { + eprintln!("Parsing error: {e}"); + Ok(()) + }) + } + bulk_load( + loader, + stdin().lock(), + format.ok_or_else(|| { + anyhow!("The --format option must be set when loading from stdin") + })?, + None, + graph, + ) + } else { + ThreadPoolBuilder::new() + .num_threads(max(1, available_parallelism()?.get() / 2)) + .thread_name(|i| format!("Oxigraph bulk loader thread {i}")) + .build()? + .scope(|s| { + for file in file { + let store = store.clone(); + s.spawn(move |_| { let f = file.clone(); - loader = loader.on_parse_error(move |e| { - eprintln!("Parsing error on file {}: {}", f.display(), e); - Ok(()) - }) - } - let fp = match File::open(&file) { - Ok(fp) => fp, - Err(error) => { + let start = Instant::now(); + let mut loader = store.bulk_loader().on_progress(move |size| { + let elapsed = start.elapsed(); + eprintln!( + "{} triples loaded in {}s ({} t/s) from {}", + size, + elapsed.as_secs(), + ((size as f64) / elapsed.as_secs_f64()).round(), + f.display() + ) + }); + if lenient { + let f = file.clone(); + loader = loader.on_parse_error(move |e| { + eprintln!("Parsing error on file {}: {}", f.display(), e); + Ok(()) + }) + } + let fp = match File::open(&file) { + Ok(fp) => fp, + Err(error) => { + eprintln!( + "Error while opening file {}: {}", + file.display(), + error + ); + return; + } + }; + if let Err(error) = { + if file.extension().map_or(false, |e| e == OsStr::new("gz")) { + bulk_load( + loader, + BufReader::new(MultiGzDecoder::new(fp)), + format.unwrap_or_else(|| { + GraphOrDatasetFormat::from_path( + &file.with_extension(""), + ) + .unwrap() + }), + None, + graph, + ) + } else { + bulk_load( + loader, + BufReader::new(fp), + format.unwrap_or_else(|| { + GraphOrDatasetFormat::from_path(&file).unwrap() + }), + None, + graph, + ) + } + } { eprintln!( - "Error while opening file {}: {}", + "Error while loading file {}: {}", file.display(), error - ); - return; - } - }; - if let Err(error) = { - if file.extension().map_or(false, |e| e == OsStr::new("gz")) { - bulk_load( - loader, - MultiGzDecoder::new(fp), - format.unwrap_or_else(|| { - GraphOrDatasetFormat::from_path( - &file.with_extension(""), - ) - .unwrap() - }), - None, - graph, - ) - } else { - bulk_load( - loader, - fp, - format.unwrap_or_else(|| { - GraphOrDatasetFormat::from_path(&file).unwrap() - }), - None, - None, ) + //TODO: hard fail } - } { - eprintln!("Error while loading file {}: {}", file.display(), error) - //TODO: hard fail - } - }) - } - }); - Ok(()) + }) + } + }); + Ok(()) + } } Command::Serve { bind } => { let mut server = Server::new(move |request| { @@ -220,12 +252,11 @@ pub fn main() -> anyhow::Result<()> { fn bulk_load( loader: BulkLoader, - reader: impl Read, + reader: impl BufRead, format: GraphOrDatasetFormat, base_iri: Option<&str>, to_graph_name: Option>, ) -> anyhow::Result<()> { - let reader = BufReader::new(reader); match format { GraphOrDatasetFormat::Graph(format) => loader.load_graph( reader, @@ -1259,6 +1290,20 @@ mod tests { Ok(()) } + #[test] + fn cli_load_from_stdin() -> Result<()> { + cli_command()? + .arg("load") + .arg("--format") + .arg("nq") + .write_stdin(" .") + .assert() + .success() + .stdout("") + .stderr(predicate::str::starts_with("1 triples loaded")); + Ok(()) + } + #[test] fn get_ui() -> Result<()> { ServerTest::new()?.test_status(