Server: Adds "graph" and "format" options to the loader

pull/398/head
Tpt 2 years ago committed by Thomas Tanon
parent ef2701dc0c
commit 4ce1b0e241
  1. 111
      server/src/main.rs

@ -1,10 +1,12 @@
use anyhow::bail; use anyhow::{bail, Context, Error};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use flate2::read::MultiGzDecoder; use flate2::read::MultiGzDecoder;
use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status}; use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status};
use oxhttp::Server; use oxhttp::Server;
use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer}; use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer};
use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::model::{
GraphName, GraphNameRef, IriParseError, NamedNode, NamedNodeRef, NamedOrBlankNode,
};
use oxigraph::sparql::{Query, QueryResults, Update}; use oxigraph::sparql::{Query, QueryResults, Update};
use oxigraph::store::{ use oxigraph::store::{
BulkLoader, LoaderError, ReadOnlyOptions, SecondaryOptions, Store, StoreOpenOptions, BulkLoader, LoaderError, ReadOnlyOptions, SecondaryOptions, Store, StoreOpenOptions,
@ -22,6 +24,7 @@ use std::fs::File;
use std::io::{self, BufReader, Read, Write}; use std::io::{self, BufReader, Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::rc::Rc; use std::rc::Rc;
use std::str;
use std::str::FromStr; use std::str::FromStr;
use std::thread::available_parallelism; use std::thread::available_parallelism;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -64,13 +67,29 @@ enum Command {
/// file(s) to load. /// file(s) to load.
/// ///
/// If multiple files are provided they are loaded in parallel. /// If multiple files are provided they are loaded in parallel.
///
/// If no file is given, stdin is read.
#[arg(short, long, global = true, num_args = 0..)] #[arg(short, long, global = true, num_args = 0..)]
file: Vec<PathBuf>, file: Vec<PathBuf>,
/// The format of the file(s) to load.
///
/// Can be an extension like "nt" or a MIME type like "application/n-triples".
///
/// By default the format is guessed from the loaded file extension.
#[arg(long, global = true, required_unless_present = "file")]
format: Option<String>,
/// Attempt to keep loading even if the data file is invalid. /// Attempt to keep loading even if the data file is invalid.
/// ///
/// Only works with N-Triples and N-Quads for now. /// Only works with N-Triples and N-Quads for now.
#[arg(long, global = true)] #[arg(long, global = true)]
lenient: bool, lenient: bool,
/// Name of the graph to load the data to.
///
/// By default the default graph is used.
///
/// Only available when loading a graph file (N-Triples, Turtle...) and not a dataset file (N-Quads, TriG...).
#[arg(long, global = true)]
graph: Option<String>,
}, },
} }
@ -94,7 +113,25 @@ pub fn main() -> anyhow::Result<()> {
}?; }?;
match matches.command { match matches.command {
Command::Load { file, lenient } => { Command::Load {
file,
lenient,
format,
graph,
} => {
let format = if let Some(format) = format {
Some(GraphOrDatasetFormat::from_str(&format)?)
} else {
None
};
let graph = if let Some(iri) = &graph {
Some(
NamedNodeRef::new(iri)
.with_context(|| format!("The target graph name {iri} is invalid"))?,
)
} else {
None
};
ThreadPoolBuilder::new() ThreadPoolBuilder::new()
.num_threads(max(1, available_parallelism()?.get() / 2)) .num_threads(max(1, available_parallelism()?.get() / 2))
.thread_name(|i| format!("Oxigraph bulk loader thread {i}")) .thread_name(|i| format!("Oxigraph bulk loader thread {i}"))
@ -138,20 +175,29 @@ pub fn main() -> anyhow::Result<()> {
bulk_load( bulk_load(
loader, loader,
MultiGzDecoder::new(fp), MultiGzDecoder::new(fp),
GraphOrDatasetFormat::from_path(&file.with_extension("")) format.unwrap_or_else(|| {
.unwrap(), GraphOrDatasetFormat::from_path(
&file.with_extension(""),
)
.unwrap()
}),
None, None,
graph,
) )
} else { } else {
bulk_load( bulk_load(
loader, loader,
fp, fp,
GraphOrDatasetFormat::from_path(&file).unwrap(), format.unwrap_or_else(|| {
GraphOrDatasetFormat::from_path(&file).unwrap()
}),
None,
None, None,
) )
} }
} { } {
eprintln!("Error while loading file {}: {}", file.display(), error) eprintln!("Error while loading file {}: {}", file.display(), error)
//TODO: hard fail
} }
}) })
} }
@ -177,14 +223,24 @@ fn bulk_load(
reader: impl Read, reader: impl Read,
format: GraphOrDatasetFormat, format: GraphOrDatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), LoaderError> { to_graph_name: Option<NamedNodeRef<'_>>,
) -> anyhow::Result<()> {
let reader = BufReader::new(reader); let reader = BufReader::new(reader);
match format { match format {
GraphOrDatasetFormat::Graph(format) => { GraphOrDatasetFormat::Graph(format) => loader.load_graph(
loader.load_graph(reader, format, GraphNameRef::DefaultGraph, base_iri) reader,
format,
to_graph_name.map_or(GraphNameRef::DefaultGraph, GraphNameRef::from),
base_iri,
)?,
GraphOrDatasetFormat::Dataset(format) => {
if to_graph_name.is_some() {
bail!("The --graph option is not allowed when loading a dataset format like NQuads or TriG");
}
loader.load_dataset(reader, format, base_iri)?
} }
GraphOrDatasetFormat::Dataset(format) => loader.load_dataset(reader, format, base_iri),
} }
Ok(())
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -210,7 +266,7 @@ impl GraphOrDatasetFormat {
} }
fn from_extension(name: &str) -> anyhow::Result<Self> { fn from_extension(name: &str) -> anyhow::Result<Self> {
Ok( match (GraphFormat::from_extension(name), DatasetFormat::from_extension(name)) { Ok(match (GraphFormat::from_extension(name), DatasetFormat::from_extension(name)) {
(Some(g), Some(d)) => bail!("The file extension '{name}' can be resolved to both '{}' and '{}', not sure what to pick", g.file_extension(), d.file_extension()), (Some(g), Some(d)) => bail!("The file extension '{name}' can be resolved to both '{}' and '{}', not sure what to pick", g.file_extension(), d.file_extension()),
(Some(g), None) => GraphOrDatasetFormat::Graph(g), (Some(g), None) => GraphOrDatasetFormat::Graph(g),
(None, Some(d)) => GraphOrDatasetFormat::Dataset(d), (None, Some(d)) => GraphOrDatasetFormat::Dataset(d),
@ -238,6 +294,20 @@ impl GraphOrDatasetFormat {
} }
} }
impl FromStr for GraphOrDatasetFormat {
type Err = Error;
fn from_str(name: &str) -> anyhow::Result<Self> {
if let Ok(t) = Self::from_extension(name) {
return Ok(t);
}
if let Ok(t) = Self::from_media_type(name) {
return Ok(t);
}
bail!("The file format '{name}' is unknown")
}
}
type HttpError = (Status, String); type HttpError = (Status, String);
fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpError> { fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpError> {
@ -1170,6 +1240,25 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn cli_load_with_graph_and_format() -> Result<()> {
let file = assert_fs::NamedTempFile::new("sample")?;
file.write_str("<http://example.com/s> <http://example.com/p> <http://example.com/o> .")?;
cli_command()?
.arg("load")
.arg("-f")
.arg(file.path())
.arg("--format")
.arg("nt")
.arg("--graph")
.arg("http://example.com")
.assert()
.success()
.stdout("")
.stderr(predicate::str::starts_with("1 triples loaded"));
Ok(())
}
#[test] #[test]
fn get_ui() -> Result<()> { fn get_ui() -> Result<()> {
ServerTest::new()?.test_status( ServerTest::new()?.test_status(

Loading…
Cancel
Save