|
|
|
@ -15,8 +15,12 @@ use async_std::future::Future; |
|
|
|
|
use async_std::net::{TcpListener, TcpStream}; |
|
|
|
|
use async_std::prelude::*; |
|
|
|
|
use async_std::task::spawn; |
|
|
|
|
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; |
|
|
|
|
use http_types::content::{Accept, ContentType}; |
|
|
|
|
use http_types::{ |
|
|
|
|
bail_status, headers, Error, Method, Mime, Request, Response, Result, StatusCode, |
|
|
|
|
}; |
|
|
|
|
use oxigraph::io::GraphFormat; |
|
|
|
|
use oxigraph::model::{GraphName, NamedNode, NamedOrBlankNode}; |
|
|
|
|
use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat}; |
|
|
|
|
use oxigraph::RocksDbStore; |
|
|
|
|
use std::str::FromStr; |
|
|
|
@ -102,14 +106,9 @@ pub async fn main() -> Result<()> { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn handle_request(request: Request, store: RocksDbStore) -> Result<Response> { |
|
|
|
|
let mut response = match (request.url().path(), request.method()) { |
|
|
|
|
Ok(match (request.url().path(), request.method()) { |
|
|
|
|
("/query", Method::Get) => { |
|
|
|
|
evaluate_urlencoded_sparql_query( |
|
|
|
|
store, |
|
|
|
|
request.url().query().unwrap_or("").as_bytes().to_vec(), |
|
|
|
|
request, |
|
|
|
|
) |
|
|
|
|
.await? |
|
|
|
|
configure_and_evaluate_sparql_query(store, url_query(&request), None, request)? |
|
|
|
|
} |
|
|
|
|
("/query", Method::Post) => { |
|
|
|
|
if let Some(content_type) = request.content_type() { |
|
|
|
@ -121,7 +120,12 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons |
|
|
|
|
.take(MAX_SPARQL_BODY_SIZE) |
|
|
|
|
.read_to_string(&mut buffer) |
|
|
|
|
.await?; |
|
|
|
|
evaluate_sparql_query(store, buffer, request).await? |
|
|
|
|
configure_and_evaluate_sparql_query( |
|
|
|
|
store, |
|
|
|
|
url_query(&request), |
|
|
|
|
Some(buffer), |
|
|
|
|
request, |
|
|
|
|
)? |
|
|
|
|
} else if content_type.essence() == "application/x-www-form-urlencoded" { |
|
|
|
|
let mut buffer = Vec::new(); |
|
|
|
|
let mut request = request; |
|
|
|
@ -130,59 +134,83 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons |
|
|
|
|
.take(MAX_SPARQL_BODY_SIZE) |
|
|
|
|
.read_to_end(&mut buffer) |
|
|
|
|
.await?; |
|
|
|
|
evaluate_urlencoded_sparql_query(store, buffer, request).await? |
|
|
|
|
configure_and_evaluate_sparql_query(store, buffer, None, request)? |
|
|
|
|
} else { |
|
|
|
|
simple_response( |
|
|
|
|
StatusCode::UnsupportedMediaType, |
|
|
|
|
format!("No supported Content-Type given: {}", content_type), |
|
|
|
|
) |
|
|
|
|
bail_status!(415, "Not supported Content-Type given: {}", content_type) |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
simple_response(StatusCode::BadRequest, "No Content-Type given") |
|
|
|
|
bail_status!(400, "No Content-Type given"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
_ => Response::new(StatusCode::NotFound), |
|
|
|
|
}; |
|
|
|
|
response.append_header("Server", SERVER); |
|
|
|
|
Ok(response) |
|
|
|
|
_ => bail_status!( |
|
|
|
|
404, |
|
|
|
|
"{} {} is not supported by this server", |
|
|
|
|
request.method(), |
|
|
|
|
request.url().path() |
|
|
|
|
), |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn simple_response(status: StatusCode, body: impl Into<Body>) -> Response { |
|
|
|
|
let mut response = Response::new(status); |
|
|
|
|
response.set_body(body); |
|
|
|
|
response |
|
|
|
|
fn url_query(request: &Request) -> Vec<u8> { |
|
|
|
|
request.url().query().unwrap_or("").as_bytes().to_vec() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn evaluate_urlencoded_sparql_query( |
|
|
|
|
fn configure_and_evaluate_sparql_query( |
|
|
|
|
store: RocksDbStore, |
|
|
|
|
encoded: Vec<u8>, |
|
|
|
|
mut query: Option<String>, |
|
|
|
|
request: Request, |
|
|
|
|
) -> Result<Response> { |
|
|
|
|
if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") { |
|
|
|
|
evaluate_sparql_query(store, query.to_string(), request).await |
|
|
|
|
let mut default_graph_uris = Vec::new(); |
|
|
|
|
let mut named_graph_uris = Vec::new(); |
|
|
|
|
for (k, v) in form_urlencoded::parse(&encoded) { |
|
|
|
|
match k.as_ref() { |
|
|
|
|
"query" => { |
|
|
|
|
if query.is_some() { |
|
|
|
|
bail_status!(400, "Multiple query parameters provided") |
|
|
|
|
} |
|
|
|
|
query = Some(v.into_owned()) |
|
|
|
|
} |
|
|
|
|
"default-graph-uri" => default_graph_uris.push(v.into_owned()), |
|
|
|
|
"named-graph-uri" => named_graph_uris.push(v.into_owned()), |
|
|
|
|
_ => bail_status!(400, "Unexpected parameter: {}", k), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if let Some(query) = query { |
|
|
|
|
evaluate_sparql_query(store, query, default_graph_uris, named_graph_uris, request) |
|
|
|
|
} else { |
|
|
|
|
Ok(simple_response( |
|
|
|
|
StatusCode::BadRequest, |
|
|
|
|
"You should set the 'query' parameter", |
|
|
|
|
)) |
|
|
|
|
bail_status!(400, "You should set the 'query' parameter") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn evaluate_sparql_query( |
|
|
|
|
fn evaluate_sparql_query( |
|
|
|
|
store: RocksDbStore, |
|
|
|
|
query: String, |
|
|
|
|
default_graph_uris: Vec<String>, |
|
|
|
|
named_graph_uris: Vec<String>, |
|
|
|
|
request: Request, |
|
|
|
|
) -> Result<Response> { |
|
|
|
|
//TODO: stream
|
|
|
|
|
let mut query = Query::parse(&query, None).map_err(|e| { |
|
|
|
|
let mut e = Error::from(e); |
|
|
|
|
e.set_status(StatusCode::BadRequest); |
|
|
|
|
e |
|
|
|
|
})?; |
|
|
|
|
if query.dataset().is_default_dataset() { |
|
|
|
|
query.dataset_mut().set_default_graph_as_union(); |
|
|
|
|
let mut query = Query::parse(&query, None).map_err(bad_request)?; |
|
|
|
|
let default_graph_uris = default_graph_uris |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|e| Ok(NamedNode::new(e)?.into())) |
|
|
|
|
.collect::<Result<Vec<GraphName>>>() |
|
|
|
|
.map_err(bad_request)?; |
|
|
|
|
let named_graph_uris = named_graph_uris |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|e| Ok(NamedNode::new(e)?.into())) |
|
|
|
|
.collect::<Result<Vec<NamedOrBlankNode>>>() |
|
|
|
|
.map_err(bad_request)?; |
|
|
|
|
|
|
|
|
|
if !default_graph_uris.is_empty() || !named_graph_uris.is_empty() { |
|
|
|
|
query.dataset_mut().set_default_graph(default_graph_uris); |
|
|
|
|
query |
|
|
|
|
.dataset_mut() |
|
|
|
|
.set_available_named_graphs(named_graph_uris); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let results = store.query(query)?; |
|
|
|
|
//TODO: stream
|
|
|
|
|
if let QueryResults::Graph(_) = results { |
|
|
|
|
let format = content_negotiation( |
|
|
|
|
request, |
|
|
|
@ -196,7 +224,7 @@ async fn evaluate_sparql_query( |
|
|
|
|
let mut body = Vec::default(); |
|
|
|
|
results.write_graph(&mut body, format)?; |
|
|
|
|
let mut response = Response::from(body); |
|
|
|
|
response.insert_header(headers::CONTENT_TYPE, format.media_type()); |
|
|
|
|
ContentType::new(format.media_type()).apply(&mut response); |
|
|
|
|
Ok(response) |
|
|
|
|
} else { |
|
|
|
|
let format = content_negotiation( |
|
|
|
@ -212,7 +240,7 @@ async fn evaluate_sparql_query( |
|
|
|
|
let mut body = Vec::default(); |
|
|
|
|
results.write(&mut body, format)?; |
|
|
|
|
let mut response = Response::from(body); |
|
|
|
|
response.insert_header(headers::CONTENT_TYPE, format.media_type()); |
|
|
|
|
ContentType::new(format.media_type()).apply(&mut response); |
|
|
|
|
Ok(response) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -229,10 +257,19 @@ async fn http_server< |
|
|
|
|
handle: F, |
|
|
|
|
) -> Result<()> { |
|
|
|
|
async_h1::accept(stream, |request| async { |
|
|
|
|
Ok(match handle(request).await { |
|
|
|
|
let mut response = match handle(request).await { |
|
|
|
|
Ok(result) => result, |
|
|
|
|
Err(error) => simple_response(error.status(), error.to_string()), |
|
|
|
|
}) |
|
|
|
|
Err(error) => { |
|
|
|
|
if error.status().is_server_error() { |
|
|
|
|
eprintln!("{}", error); |
|
|
|
|
} |
|
|
|
|
let mut response = Response::new(error.status()); |
|
|
|
|
response.set_body(error.to_string()); |
|
|
|
|
response |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
response.append_header(headers::SERVER, SERVER); |
|
|
|
|
Ok(response) |
|
|
|
|
}) |
|
|
|
|
.await |
|
|
|
|
} |
|
|
|
@ -251,46 +288,30 @@ async fn http_server< |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn bad_request(e: impl Into<Error>) -> Error { |
|
|
|
|
let mut e = e.into(); |
|
|
|
|
e.set_status(StatusCode::BadRequest); |
|
|
|
|
e |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn content_negotiation<F>( |
|
|
|
|
request: Request, |
|
|
|
|
supported: &[&str], |
|
|
|
|
parse: impl Fn(&str) -> Option<F>, |
|
|
|
|
) -> Result<F> { |
|
|
|
|
let header = request |
|
|
|
|
.header(headers::ACCEPT) |
|
|
|
|
.map(|h| h.last().as_str().trim()) |
|
|
|
|
.unwrap_or(""); |
|
|
|
|
if let Some(mut accept) = Accept::from_headers(request)? { |
|
|
|
|
let supported: Vec<Mime> = supported |
|
|
|
|
.iter() |
|
|
|
|
.map(|h| Mime::from_str(h).unwrap()) |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
let mut result = supported.first().unwrap(); |
|
|
|
|
let mut result_score = 0f32; |
|
|
|
|
|
|
|
|
|
if !header.is_empty() { |
|
|
|
|
for possible in header.split(',') { |
|
|
|
|
let possible = Mime::from_str(possible.trim())?; |
|
|
|
|
let score = if let Some(q) = possible.param("q") { |
|
|
|
|
f32::from_str(&q.to_string())? |
|
|
|
|
parse(accept.negotiate(&supported)?.value().as_str()) |
|
|
|
|
} else { |
|
|
|
|
1. |
|
|
|
|
}; |
|
|
|
|
if score <= result_score { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
for candidate in &supported { |
|
|
|
|
if (possible.basetype() == candidate.basetype() || possible.basetype() == "*") |
|
|
|
|
&& (possible.subtype() == candidate.subtype() || possible.subtype() == "*") |
|
|
|
|
{ |
|
|
|
|
result = candidate; |
|
|
|
|
result_score = score; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
parse(supported.first().ok_or_else(|| { |
|
|
|
|
Error::from_str( |
|
|
|
|
StatusCode::InternalServerError, |
|
|
|
|
"No default MIME type provided", |
|
|
|
|
) |
|
|
|
|
})?) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
parse(result.essence()) |
|
|
|
|
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) |
|
|
|
|
} |
|
|
|
|