Server: allows bulk loading gzipped files

pull/190/head
Tpt 3 years ago
parent 9e969cd8eb
commit 49804d3b46
  1. 38
      Cargo.lock
  2. 1
      server/Cargo.toml
  3. 82
      server/src/main.rs

38
Cargo.lock generated

@ -2,6 +2,12 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.7.6" version = "0.7.6"
@ -240,6 +246,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "criterion" name = "criterion"
version = "0.3.5" version = "0.3.5"
@ -380,6 +395,18 @@ dependencies = [
"termcolor", "termcolor",
] ]
[[package]]
name = "flate2"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
dependencies = [
"cfg-if",
"crc32fast",
"libc",
"miniz_oxide",
]
[[package]] [[package]]
name = "form_urlencoded" name = "form_urlencoded"
version = "1.0.1" version = "1.0.1"
@ -675,6 +702,16 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
dependencies = [
"adler",
"autocfg",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.0" version = "7.1.0"
@ -804,6 +841,7 @@ name = "oxigraph_server"
version = "0.3.0-beta.4" version = "0.3.0-beta.4"
dependencies = [ dependencies = [
"clap 3.0.14", "clap 3.0.14",
"flate2",
"oxhttp", "oxhttp",
"oxigraph", "oxigraph",
"oxiri", "oxiri",

@ -19,3 +19,4 @@ sparesults = { version = "0.1.0-beta.4", path = "../lib/sparesults", features =
rand = "0.8" rand = "0.8"
url = "2" url = "2"
oxiri = "0.2" oxiri = "0.2"
flate2 = "1"

@ -10,12 +10,13 @@
)] )]
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use flate2::read::MultiGzDecoder;
use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status}; use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status};
use oxhttp::Server; use oxhttp::Server;
use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer}; use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer};
use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode};
use oxigraph::sparql::{Query, QueryResults, Update}; use oxigraph::sparql::{Query, QueryResults, Update};
use oxigraph::store::Store; use oxigraph::store::{BulkLoader, Store};
use oxiri::Iri; use oxiri::Iri;
use rand::random; use rand::random;
use sparesults::{QueryResultsFormat, QueryResultsSerializer}; use sparesults::{QueryResultsFormat, QueryResultsSerializer};
@ -76,35 +77,36 @@ pub fn main() -> std::io::Result<()> {
match matches.command { match matches.command {
Command::Load { file } => { Command::Load { file } => {
let handles = file.iter().map(|file| { let handles = file
let store = store.clone(); .iter()
let file = file.to_string(); .map(|file| {
spawn(move || { let store = store.clone();
let f = file.clone(); let file = file.to_string();
let start = Instant::now(); spawn(move || {
let loader = store.bulk_loader().on_progress(move |size| { let f = file.clone();
let elapsed = start.elapsed(); let start = Instant::now();
println!("{} triples loaded in {}s ({} t/s) from {}", size, elapsed.as_secs(), size / elapsed.as_secs(), f) let loader = store.bulk_loader().on_progress(move |size| {
}); let elapsed = start.elapsed();
let reader = BufReader::new(File::open(&file)?); println!(
if let Some(format) = file "{} triples loaded in {}s ({} t/s) from {}",
.rsplit_once('.') size,
.and_then(|(_, extension)| DatasetFormat::from_extension(extension)) { elapsed.as_secs(),
loader.load_dataset(reader, format, None)?; size / elapsed.as_secs(),
Ok(()) f
} else if let Some(format) = file )
.rsplit_once('.') });
.and_then(|(_, extension)| GraphFormat::from_extension(extension)) { if file.ends_with(".gz") {
loader.load_graph(reader, format, GraphNameRef::DefaultGraph, None)?; bulk_load(
Ok(()) loader,
} else { &file[..file.len() - 3],
Err(io::Error::new( MultiGzDecoder::new(File::open(&file)?),
ErrorKind::InvalidInput, )
"The server is not able to guess the file format of {} from its extension", } else {
)) bulk_load(loader, &file, File::open(&file)?)
} }
})
}) })
}).collect::<Vec<JoinHandle<io::Result<()>>>>(); .collect::<Vec<JoinHandle<io::Result<()>>>>();
for handle in handles { for handle in handles {
handle.join().unwrap()?; handle.join().unwrap()?;
} }
@ -124,6 +126,28 @@ pub fn main() -> std::io::Result<()> {
} }
} }
fn bulk_load(loader: BulkLoader, file: &str, reader: impl Read) -> io::Result<()> {
let (_, extension) = file.rsplit_once('.').ok_or_else(|| io::Error::new(
ErrorKind::InvalidInput,
format!("The server is not able to guess the file format of {} because the file name as no extension", file)))?;
let reader = BufReader::new(reader);
if let Some(format) = DatasetFormat::from_extension(extension) {
loader.load_dataset(reader, format, None)?;
Ok(())
} else if let Some(format) = GraphFormat::from_extension(extension) {
loader.load_graph(reader, format, GraphNameRef::DefaultGraph, None)?;
Ok(())
} else {
Err(io::Error::new(
ErrorKind::InvalidInput,
format!(
"The server is not able to guess the file format from the extension {}",
extension
),
))
}
}
fn handle_request(request: &mut Request, store: Store) -> Response { fn handle_request(request: &mut Request, store: Store) -> Response {
match (request.url().path(), request.method().as_ref()) { match (request.url().path(), request.method().as_ref()) {
("/", "HEAD") => Response::builder(Status::OK) ("/", "HEAD") => Response::builder(Status::OK)

Loading…
Cancel
Save