Server: Allows loading from stdin

pull/398/head
Tpt 2 years ago committed by Thomas Tanon
parent 4ce1b0e241
commit 03df957427
  1. 183
      server/src/main.rs

@ -1,4 +1,4 @@
use anyhow::{bail, Context, Error}; use anyhow::{anyhow, bail, Context, Error};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use flate2::read::MultiGzDecoder; use flate2::read::MultiGzDecoder;
use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status}; use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status};
@ -21,7 +21,7 @@ use std::cmp::{max, min};
use std::ffi::OsStr; use std::ffi::OsStr;
use std::fmt; use std::fmt;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, Read, Write}; use std::io::{self, stdin, BufRead, BufReader, Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::rc::Rc; use std::rc::Rc;
use std::str; use std::str;
@ -132,77 +132,109 @@ pub fn main() -> anyhow::Result<()> {
} else { } else {
None None
}; };
ThreadPoolBuilder::new() if file.is_empty() {
.num_threads(max(1, available_parallelism()?.get() / 2)) // We read from stdin
.thread_name(|i| format!("Oxigraph bulk loader thread {i}")) let start = Instant::now();
.build()? let mut loader = store.bulk_loader().on_progress(move |size| {
.scope(|s| { let elapsed = start.elapsed();
for file in file { eprintln!(
let store = store.clone(); "{size} triples loaded in {}s ({} t/s)",
s.spawn(move |_| { elapsed.as_secs(),
let f = file.clone(); ((size as f64) / elapsed.as_secs_f64()).round()
let start = Instant::now(); )
let mut loader = store.bulk_loader().on_progress(move |size| { });
let elapsed = start.elapsed(); if lenient {
eprintln!( loader = loader.on_parse_error(move |e| {
"{} triples loaded in {}s ({} t/s) from {}", eprintln!("Parsing error: {e}");
size, Ok(())
elapsed.as_secs(), })
((size as f64) / elapsed.as_secs_f64()).round(), }
f.display() bulk_load(
) loader,
}); stdin().lock(),
if lenient { format.ok_or_else(|| {
anyhow!("The --format option must be set when loading from stdin")
})?,
None,
graph,
)
} else {
ThreadPoolBuilder::new()
.num_threads(max(1, available_parallelism()?.get() / 2))
.thread_name(|i| format!("Oxigraph bulk loader thread {i}"))
.build()?
.scope(|s| {
for file in file {
let store = store.clone();
s.spawn(move |_| {
let f = file.clone(); let f = file.clone();
loader = loader.on_parse_error(move |e| { let start = Instant::now();
eprintln!("Parsing error on file {}: {}", f.display(), e); let mut loader = store.bulk_loader().on_progress(move |size| {
Ok(()) let elapsed = start.elapsed();
}) eprintln!(
} "{} triples loaded in {}s ({} t/s) from {}",
let fp = match File::open(&file) { size,
Ok(fp) => fp, elapsed.as_secs(),
Err(error) => { ((size as f64) / elapsed.as_secs_f64()).round(),
f.display()
)
});
if lenient {
let f = file.clone();
loader = loader.on_parse_error(move |e| {
eprintln!("Parsing error on file {}: {}", f.display(), e);
Ok(())
})
}
let fp = match File::open(&file) {
Ok(fp) => fp,
Err(error) => {
eprintln!(
"Error while opening file {}: {}",
file.display(),
error
);
return;
}
};
if let Err(error) = {
if file.extension().map_or(false, |e| e == OsStr::new("gz")) {
bulk_load(
loader,
BufReader::new(MultiGzDecoder::new(fp)),
format.unwrap_or_else(|| {
GraphOrDatasetFormat::from_path(
&file.with_extension(""),
)
.unwrap()
}),
None,
graph,
)
} else {
bulk_load(
loader,
BufReader::new(fp),
format.unwrap_or_else(|| {
GraphOrDatasetFormat::from_path(&file).unwrap()
}),
None,
graph,
)
}
} {
eprintln!( eprintln!(
"Error while opening file {}: {}", "Error while loading file {}: {}",
file.display(), file.display(),
error error
);
return;
}
};
if let Err(error) = {
if file.extension().map_or(false, |e| e == OsStr::new("gz")) {
bulk_load(
loader,
MultiGzDecoder::new(fp),
format.unwrap_or_else(|| {
GraphOrDatasetFormat::from_path(
&file.with_extension(""),
)
.unwrap()
}),
None,
graph,
)
} else {
bulk_load(
loader,
fp,
format.unwrap_or_else(|| {
GraphOrDatasetFormat::from_path(&file).unwrap()
}),
None,
None,
) )
//TODO: hard fail
} }
} { })
eprintln!("Error while loading file {}: {}", file.display(), error) }
//TODO: hard fail });
} Ok(())
}) }
}
});
Ok(())
} }
Command::Serve { bind } => { Command::Serve { bind } => {
let mut server = Server::new(move |request| { let mut server = Server::new(move |request| {
@ -220,12 +252,11 @@ pub fn main() -> anyhow::Result<()> {
fn bulk_load( fn bulk_load(
loader: BulkLoader, loader: BulkLoader,
reader: impl Read, reader: impl BufRead,
format: GraphOrDatasetFormat, format: GraphOrDatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
to_graph_name: Option<NamedNodeRef<'_>>, to_graph_name: Option<NamedNodeRef<'_>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let reader = BufReader::new(reader);
match format { match format {
GraphOrDatasetFormat::Graph(format) => loader.load_graph( GraphOrDatasetFormat::Graph(format) => loader.load_graph(
reader, reader,
@ -1259,6 +1290,20 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn cli_load_from_stdin() -> Result<()> {
cli_command()?
.arg("load")
.arg("--format")
.arg("nq")
.write_stdin("<http://example.com/s> <http://example.com/p> <http://example.com/o> .")
.assert()
.success()
.stdout("")
.stderr(predicate::str::starts_with("1 triples loaded"));
Ok(())
}
#[test] #[test]
fn get_ui() -> Result<()> { fn get_ui() -> Result<()> {
ServerTest::new()?.test_status( ServerTest::new()?.test_status(

Loading…
Cancel
Save