From 49804d3b46e14397aefe60f9bbcdc3b96c8d7ed6 Mon Sep 17 00:00:00 2001 From: Tpt Date: Sat, 12 Feb 2022 12:15:37 +0100 Subject: [PATCH] Server: allows bulk loading gzipped files --- Cargo.lock | 38 +++++++++++++++++++++ server/Cargo.toml | 1 + server/src/main.rs | 82 ++++++++++++++++++++++++++++++---------------- 3 files changed, 92 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fa6c8f44..656fbe3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.7.6" @@ -240,6 +246,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.3.5" @@ -380,6 +395,18 @@ dependencies = [ "termcolor", ] +[[package]] +name = "flate2" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -675,6 +702,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "nom" version = "7.1.0" @@ -804,6 +841,7 @@ name = "oxigraph_server" version = "0.3.0-beta.4" dependencies = [ "clap 3.0.14", + "flate2", "oxhttp", "oxigraph", "oxiri", diff --git a/server/Cargo.toml b/server/Cargo.toml index 6cb0f5e2..0d42b824 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -19,3 +19,4 @@ sparesults = { version = "0.1.0-beta.4", path = "../lib/sparesults", features = rand = "0.8" url = "2" oxiri = "0.2" +flate2 = "1" diff --git a/server/src/main.rs b/server/src/main.rs index bb01e144..db728038 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -10,12 +10,13 @@ )] use clap::{Parser, Subcommand}; +use flate2::read::MultiGzDecoder; use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status}; use oxhttp::Server; use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer}; use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::sparql::{Query, QueryResults, Update}; -use oxigraph::store::Store; +use oxigraph::store::{BulkLoader, Store}; use oxiri::Iri; use rand::random; use sparesults::{QueryResultsFormat, QueryResultsSerializer}; @@ -76,35 +77,36 @@ pub fn main() -> std::io::Result<()> { match matches.command { Command::Load { file } => { - let handles = file.iter().map(|file| { - let store = store.clone(); - let file = file.to_string(); - spawn(move || { - let f = file.clone(); - let start = Instant::now(); - let loader = store.bulk_loader().on_progress(move |size| { - let elapsed = start.elapsed(); - println!("{} triples loaded in {}s ({} t/s) from {}", size, elapsed.as_secs(), size / elapsed.as_secs(), f) - }); - let reader = BufReader::new(File::open(&file)?); - if let Some(format) = file - .rsplit_once('.') - .and_then(|(_, extension)| DatasetFormat::from_extension(extension)) { - loader.load_dataset(reader, format, None)?; - Ok(()) - } else if let Some(format) = file - .rsplit_once('.') - .and_then(|(_, extension)| GraphFormat::from_extension(extension)) { - loader.load_graph(reader, format, GraphNameRef::DefaultGraph, None)?; - Ok(()) - } else { - Err(io::Error::new( - ErrorKind::InvalidInput, - "The server is not able to guess the file format of {} from its extension", - )) - } + let handles = file + .iter() + .map(|file| { + let store = store.clone(); + let file = file.to_string(); + spawn(move || { + let f = file.clone(); + let start = Instant::now(); + let loader = store.bulk_loader().on_progress(move |size| { + let elapsed = start.elapsed(); + println!( + "{} triples loaded in {}s ({} t/s) from {}", + size, + elapsed.as_secs(), + size / elapsed.as_secs(), + f + ) + }); + if file.ends_with(".gz") { + bulk_load( + loader, + &file[..file.len() - 3], + MultiGzDecoder::new(File::open(&file)?), + ) + } else { + bulk_load(loader, &file, File::open(&file)?) + } + }) }) - }).collect::>>>(); + .collect::>>>(); for handle in handles { handle.join().unwrap()?; } @@ -124,6 +126,28 @@ pub fn main() -> std::io::Result<()> { } } +fn bulk_load(loader: BulkLoader, file: &str, reader: impl Read) -> io::Result<()> { + let (_, extension) = file.rsplit_once('.').ok_or_else(|| io::Error::new( + ErrorKind::InvalidInput, + format!("The server is not able to guess the file format of {} because the file name as no extension", file)))?; + let reader = BufReader::new(reader); + if let Some(format) = DatasetFormat::from_extension(extension) { + loader.load_dataset(reader, format, None)?; + Ok(()) + } else if let Some(format) = GraphFormat::from_extension(extension) { + loader.load_graph(reader, format, GraphNameRef::DefaultGraph, None)?; + Ok(()) + } else { + Err(io::Error::new( + ErrorKind::InvalidInput, + format!( + "The server is not able to guess the file format from the extension {}", + extension + ), + )) + } +} + fn handle_request(request: &mut Request, store: Store) -> Response { match (request.url().path(), request.method().as_ref()) { ("/", "HEAD") => Response::builder(Status::OK)