Adds bulk loader option to HTTP API

pull/300/head
Tpt 2 years ago committed by Thomas Tanon
parent 112631a0d7
commit 127ffc3547
  1. 230
      server/src/main.rs

@ -11,6 +11,7 @@ use oxiri::Iri;
use rand::random;
use rayon_core::ThreadPoolBuilder;
use sparesults::{QueryResultsFormat, QueryResultsSerializer};
use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::{max, min};
use std::ffi::OsStr;
@ -164,8 +165,7 @@ fn bulk_load(
loader.load_graph(reader, format, GraphNameRef::DefaultGraph, base_iri)
}
GraphOrDatasetFormat::Dataset(format) => loader.load_dataset(reader, format, base_iri),
}?;
Ok(())
}
}
#[derive(Copy, Clone)]
@ -386,14 +386,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
true
}
};
store
.load_graph(
BufReader::new(request.body_mut()),
format,
GraphName::from(target).as_ref(),
None,
)
.map_err(bad_request)?;
web_load_graph(&store, request, format, GraphName::from(target).as_ref())?;
Ok(Response::builder(if new {
Status::CREATED
} else {
@ -404,9 +397,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
let format = DatasetFormat::from_media_type(&content_type)
.ok_or_else(|| unsupported_media_type(&content_type))?;
store.clear().map_err(internal_server_error)?;
store
.load_dataset(BufReader::new(request.body_mut()), format, None)
.map_err(internal_server_error)?;
web_load_dataset(&store, request, format)?;
Ok(Response::builder(Status::NO_CONTENT).build())
}
}
@ -444,14 +435,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
let format = GraphFormat::from_media_type(&content_type)
.ok_or_else(|| unsupported_media_type(&content_type))?;
let new = assert_that_graph_exists(&store, &target).is_ok();
store
.load_graph(
BufReader::new(request.body_mut()),
format,
GraphName::from(target).as_ref(),
None,
)
.map_err(bad_request)?;
web_load_graph(&store, request, format, GraphName::from(target).as_ref())?;
Ok(Response::builder(if new {
Status::CREATED
} else {
@ -465,18 +449,14 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
GraphOrDatasetFormat::Graph(format) => {
let graph =
resolve_with_base(request, &format!("/store/{:x}", random::<u128>()))?;
store
.load_graph(BufReader::new(request.body_mut()), format, &graph, None)
.map_err(bad_request)?;
web_load_graph(&store, request, format, graph.as_ref().into())?;
Ok(Response::builder(Status::CREATED)
.with_header(HeaderName::LOCATION, graph.into_string())
.unwrap()
.build())
}
GraphOrDatasetFormat::Dataset(format) => {
store
.load_dataset(BufReader::new(request.body_mut()), format, None)
.map_err(bad_request)?;
web_load_dataset(&store, request, format)?;
Ok(Response::builder(Status::NO_CONTENT).build())
}
}
@ -520,6 +500,14 @@ fn url_query(request: &Request) -> &[u8] {
request.url().query().unwrap_or("").as_bytes()
}
fn url_query_parameter<'a>(request: &'a Request, param: &str) -> Option<Cow<'a, str>> {
request
.url()
.query_pairs()
.find(|(k, _)| k == param)
.map(|(_, v)| v)
}
fn configure_and_evaluate_sparql_query(
store: Store,
encoded: &[&[u8]],
@ -739,13 +727,11 @@ fn store_target(request: &Request) -> Result<Option<NamedGraphName>, HttpError>
if request.url().path() == "/store" {
let mut graph = None;
let mut default = false;
for (k, v) in form_urlencoded::parse(request.url().query().unwrap_or("").as_bytes()) {
for (k, v) in request.url().query_pairs() {
match k.as_ref() {
"graph" => graph = Some(v.into_owned()),
"default" => default = true,
_ => {
return Err(bad_request(format!("Unexpected parameter: {}", k)));
}
_ => continue,
}
}
if let Some(graph) = graph {
@ -917,6 +903,67 @@ fn content_type(request: &Request) -> Option<String> {
)
}
fn web_load_graph(
store: &Store,
request: &mut Request,
format: GraphFormat,
to_graph_name: GraphNameRef<'_>,
) -> Result<(), HttpError> {
if url_query_parameter(request, "no_transaction").is_some() {
web_bulk_loader(store, request).load_graph(
BufReader::new(request.body_mut()),
format,
to_graph_name,
None,
)
} else {
store.load_graph(
BufReader::new(request.body_mut()),
format,
to_graph_name,
None,
)
}
.map_err(loader_to_http_error)
}
fn web_load_dataset(
store: &Store,
request: &mut Request,
format: DatasetFormat,
) -> Result<(), HttpError> {
if url_query_parameter(request, "no_transaction").is_some() {
web_bulk_loader(store, request).load_dataset(
BufReader::new(request.body_mut()),
format,
None,
)
} else {
store.load_dataset(BufReader::new(request.body_mut()), format, None)
}
.map_err(loader_to_http_error)
}
fn web_bulk_loader(store: &Store, request: &Request) -> BulkLoader {
let start = Instant::now();
let mut loader = store.bulk_loader().on_progress(move |size| {
let elapsed = start.elapsed();
eprintln!(
"{} triples loaded in {}s ({} t/s)",
size,
elapsed.as_secs(),
((size as f64) / elapsed.as_secs_f64()).round()
)
});
if url_query_parameter(request, "lenient").is_some() {
loader = loader.on_parse_error(move |e| {
eprintln!("Parsing error: {}", e);
Ok(())
})
}
loader
}
fn error(status: Status, message: impl fmt::Display) -> Response {
Response::builder(status)
.with_header(HeaderName::CONTENT_TYPE, "text/plain; charset=utf-8")
@ -940,6 +987,13 @@ fn internal_server_error(message: impl fmt::Display) -> HttpError {
(Status::INTERNAL_SERVER_ERROR, message.to_string())
}
fn loader_to_http_error(e: LoaderError) -> HttpError {
match e {
LoaderError::Parsing(e) => bad_request(e),
LoaderError::Storage(e) => internal_server_error(e),
}
}
/// Hacky tool to allow implementing read on top of a write loop
struct ReadForWrite<O, U: (Fn(O) -> io::Result<Option<O>>)> {
buffer: Rc<RefCell<Vec<u8>>>,
@ -1666,6 +1720,120 @@ mod tests {
server.test_status(request, Status::OK);
}
#[test]
fn graph_store_lenient_bulk() {
let server = ServerTest::new();
let invalid_data = "
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix v: <http://www.w3.org/2006/vcard/ns#> .
<http://$HOST$/$GRAPHSTORE$/person/1> a foaf:Person . foo";
// POST
let request = Request::builder(
Method::POST,
"http://localhost/store/person/1.ttl?no_transaction&lenient"
.parse()
.unwrap(),
)
.with_header(HeaderName::CONTENT_TYPE, "text/turtle; charset=utf-8")
.unwrap()
.with_body(invalid_data);
server.test_status(request, Status::NO_CONTENT);
// GET of POST
let request = Request::builder(
Method::GET,
"http://localhost/store?graph=/store/person/1.ttl"
.parse()
.unwrap(),
)
.with_header(HeaderName::ACCEPT, "text/turtle")
.unwrap()
.build();
server.test_status(request, Status::OK);
// POST dataset
let request = Request::builder(
Method::POST,
"http://localhost/store?lenient&no_transaction"
.parse()
.unwrap(),
)
.with_header(HeaderName::CONTENT_TYPE, "application/trig; charset=utf-8")
.unwrap()
.with_body(invalid_data);
server.test_status(request, Status::NO_CONTENT);
// GET of POST dataset
let request = Request::builder(
Method::GET,
"http://localhost/store?default".parse().unwrap(),
)
.with_header(HeaderName::ACCEPT, "text/turtle")
.unwrap()
.build();
server.test_status(request, Status::OK);
// PUT
let request = Request::builder(
Method::PUT,
"http://localhost/store/person/1.ttl?lenient&no_transaction"
.parse()
.unwrap(),
)
.with_header(HeaderName::CONTENT_TYPE, "text/turtle; charset=utf-8")
.unwrap()
.with_body(invalid_data);
server.test_status(request, Status::NO_CONTENT);
// GET of PUT - Initial state
let request = Request::builder(
Method::GET,
"http://localhost/store?graph=/store/person/1.ttl"
.parse()
.unwrap(),
)
.with_header(HeaderName::ACCEPT, "text/turtle")
.unwrap()
.build();
server.test_status(request, Status::OK);
// PUT dataset
let request = Request::builder(
Method::PUT,
"http://localhost/store?lenient&no_transaction"
.parse()
.unwrap(),
)
.with_header(HeaderName::CONTENT_TYPE, "application/trig; charset=utf-8")
.unwrap()
.with_body(invalid_data);
server.test_status(request, Status::NO_CONTENT);
// GET of PUT dataset
let request = Request::builder(
Method::GET,
"http://localhost/store?default".parse().unwrap(),
)
.with_header(HeaderName::ACCEPT, "text/turtle")
.unwrap()
.build();
server.test_status(request, Status::OK);
// GET of PUT dataset - replacement
let request = Request::builder(
Method::GET,
"http://localhost/store?graph=/store/person/1.ttl"
.parse()
.unwrap(),
)
.with_header(HeaderName::ACCEPT, "text/turtle")
.unwrap()
.build();
server.test_status(request, Status::NOT_FOUND);
}
struct ServerTest {
store: Store,
}

Loading…
Cancel
Save