From 127ffc3547ca2dae7278263c053c8687daf6ef62 Mon Sep 17 00:00:00 2001 From: Tpt Date: Sat, 26 Nov 2022 12:04:02 +0100 Subject: [PATCH] Adds bulk loader option to HTTP API --- server/src/main.rs | 230 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 199 insertions(+), 31 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index f126f8c0..be3130fb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -11,6 +11,7 @@ use oxiri::Iri; use rand::random; use rayon_core::ThreadPoolBuilder; use sparesults::{QueryResultsFormat, QueryResultsSerializer}; +use std::borrow::Cow; use std::cell::RefCell; use std::cmp::{max, min}; use std::ffi::OsStr; @@ -164,8 +165,7 @@ fn bulk_load( loader.load_graph(reader, format, GraphNameRef::DefaultGraph, base_iri) } GraphOrDatasetFormat::Dataset(format) => loader.load_dataset(reader, format, base_iri), - }?; - Ok(()) + } } #[derive(Copy, Clone)] @@ -386,14 +386,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result Result Result Result { let graph = resolve_with_base(request, &format!("/store/{:x}", random::()))?; - store - .load_graph(BufReader::new(request.body_mut()), format, &graph, None) - .map_err(bad_request)?; + web_load_graph(&store, request, format, graph.as_ref().into())?; Ok(Response::builder(Status::CREATED) .with_header(HeaderName::LOCATION, graph.into_string()) .unwrap() .build()) } GraphOrDatasetFormat::Dataset(format) => { - store - .load_dataset(BufReader::new(request.body_mut()), format, None) - .map_err(bad_request)?; + web_load_dataset(&store, request, format)?; Ok(Response::builder(Status::NO_CONTENT).build()) } } @@ -520,6 +500,14 @@ fn url_query(request: &Request) -> &[u8] { request.url().query().unwrap_or("").as_bytes() } +fn url_query_parameter<'a>(request: &'a Request, param: &str) -> Option> { + request + .url() + .query_pairs() + .find(|(k, _)| k == param) + .map(|(_, v)| v) +} + fn configure_and_evaluate_sparql_query( store: Store, encoded: &[&[u8]], @@ -739,13 +727,11 @@ fn store_target(request: &Request) -> Result, HttpError> if request.url().path() == "/store" { let mut graph = None; let mut default = false; - for (k, v) in form_urlencoded::parse(request.url().query().unwrap_or("").as_bytes()) { + for (k, v) in request.url().query_pairs() { match k.as_ref() { "graph" => graph = Some(v.into_owned()), "default" => default = true, - _ => { - return Err(bad_request(format!("Unexpected parameter: {}", k))); - } + _ => continue, } } if let Some(graph) = graph { @@ -917,6 +903,67 @@ fn content_type(request: &Request) -> Option { ) } +fn web_load_graph( + store: &Store, + request: &mut Request, + format: GraphFormat, + to_graph_name: GraphNameRef<'_>, +) -> Result<(), HttpError> { + if url_query_parameter(request, "no_transaction").is_some() { + web_bulk_loader(store, request).load_graph( + BufReader::new(request.body_mut()), + format, + to_graph_name, + None, + ) + } else { + store.load_graph( + BufReader::new(request.body_mut()), + format, + to_graph_name, + None, + ) + } + .map_err(loader_to_http_error) +} + +fn web_load_dataset( + store: &Store, + request: &mut Request, + format: DatasetFormat, +) -> Result<(), HttpError> { + if url_query_parameter(request, "no_transaction").is_some() { + web_bulk_loader(store, request).load_dataset( + BufReader::new(request.body_mut()), + format, + None, + ) + } else { + store.load_dataset(BufReader::new(request.body_mut()), format, None) + } + .map_err(loader_to_http_error) +} + +fn web_bulk_loader(store: &Store, request: &Request) -> BulkLoader { + let start = Instant::now(); + let mut loader = store.bulk_loader().on_progress(move |size| { + let elapsed = start.elapsed(); + eprintln!( + "{} triples loaded in {}s ({} t/s)", + size, + elapsed.as_secs(), + ((size as f64) / elapsed.as_secs_f64()).round() + ) + }); + if url_query_parameter(request, "lenient").is_some() { + loader = loader.on_parse_error(move |e| { + eprintln!("Parsing error: {}", e); + Ok(()) + }) + } + loader +} + fn error(status: Status, message: impl fmt::Display) -> Response { Response::builder(status) .with_header(HeaderName::CONTENT_TYPE, "text/plain; charset=utf-8") @@ -940,6 +987,13 @@ fn internal_server_error(message: impl fmt::Display) -> HttpError { (Status::INTERNAL_SERVER_ERROR, message.to_string()) } +fn loader_to_http_error(e: LoaderError) -> HttpError { + match e { + LoaderError::Parsing(e) => bad_request(e), + LoaderError::Storage(e) => internal_server_error(e), + } +} + /// Hacky tool to allow implementing read on top of a write loop struct ReadForWrite io::Result>)> { buffer: Rc>>, @@ -1666,6 +1720,120 @@ mod tests { server.test_status(request, Status::OK); } + #[test] + fn graph_store_lenient_bulk() { + let server = ServerTest::new(); + let invalid_data = " +@prefix foaf: . +@prefix v: . + + a foaf:Person . foo"; + + // POST + let request = Request::builder( + Method::POST, + "http://localhost/store/person/1.ttl?no_transaction&lenient" + .parse() + .unwrap(), + ) + .with_header(HeaderName::CONTENT_TYPE, "text/turtle; charset=utf-8") + .unwrap() + .with_body(invalid_data); + server.test_status(request, Status::NO_CONTENT); + + // GET of POST + let request = Request::builder( + Method::GET, + "http://localhost/store?graph=/store/person/1.ttl" + .parse() + .unwrap(), + ) + .with_header(HeaderName::ACCEPT, "text/turtle") + .unwrap() + .build(); + server.test_status(request, Status::OK); + + // POST dataset + let request = Request::builder( + Method::POST, + "http://localhost/store?lenient&no_transaction" + .parse() + .unwrap(), + ) + .with_header(HeaderName::CONTENT_TYPE, "application/trig; charset=utf-8") + .unwrap() + .with_body(invalid_data); + server.test_status(request, Status::NO_CONTENT); + + // GET of POST dataset + let request = Request::builder( + Method::GET, + "http://localhost/store?default".parse().unwrap(), + ) + .with_header(HeaderName::ACCEPT, "text/turtle") + .unwrap() + .build(); + server.test_status(request, Status::OK); + + // PUT + let request = Request::builder( + Method::PUT, + "http://localhost/store/person/1.ttl?lenient&no_transaction" + .parse() + .unwrap(), + ) + .with_header(HeaderName::CONTENT_TYPE, "text/turtle; charset=utf-8") + .unwrap() + .with_body(invalid_data); + server.test_status(request, Status::NO_CONTENT); + + // GET of PUT - Initial state + let request = Request::builder( + Method::GET, + "http://localhost/store?graph=/store/person/1.ttl" + .parse() + .unwrap(), + ) + .with_header(HeaderName::ACCEPT, "text/turtle") + .unwrap() + .build(); + server.test_status(request, Status::OK); + + // PUT dataset + let request = Request::builder( + Method::PUT, + "http://localhost/store?lenient&no_transaction" + .parse() + .unwrap(), + ) + .with_header(HeaderName::CONTENT_TYPE, "application/trig; charset=utf-8") + .unwrap() + .with_body(invalid_data); + server.test_status(request, Status::NO_CONTENT); + + // GET of PUT dataset + let request = Request::builder( + Method::GET, + "http://localhost/store?default".parse().unwrap(), + ) + .with_header(HeaderName::ACCEPT, "text/turtle") + .unwrap() + .build(); + server.test_status(request, Status::OK); + + // GET of PUT dataset - replacement + let request = Request::builder( + Method::GET, + "http://localhost/store?graph=/store/person/1.ttl" + .parse() + .unwrap(), + ) + .with_header(HeaderName::ACCEPT, "text/turtle") + .unwrap() + .build(); + server.test_status(request, Status::NOT_FOUND); + } + struct ServerTest { store: Store, }