Uses rouille as HTTP server

Way easier to write than with asyncio
Allows to post files to the server
pull/10/head
Tpt 5 years ago
parent 620b1c1df4
commit 17abe2d1ab
  1. 1
      lib/src/lib.rs
  2. 2
      lib/src/sparql/model.rs
  3. 101
      lib/src/syntax.rs
  4. 12
      server/Cargo.toml
  5. 415
      server/src/main.rs

@ -48,4 +48,5 @@ pub use crate::store::MemoryRepository;
#[cfg(feature = "rocksdb")]
pub use crate::store::RocksDbRepository;
pub use crate::syntax::DatasetSyntax;
pub use crate::syntax::FileSyntax;
pub use crate::syntax::GraphSyntax;

@ -31,7 +31,7 @@ impl<'a> QueryResult<'a> {
}
/// [SPARQL query](https://www.w3.org/TR/sparql11-query/) serialization formats
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
pub enum QueryResultSyntax {
/// [SPARQL Query Results XML Format](http://www.w3.org/TR/rdf-sparql-XMLres/)
Xml,

@ -1,5 +1,28 @@
/// A file serialization format.
///
/// Is implemented by `GraphSyntax` for graph files and `DatasetSyntax` for dataset files.
pub trait FileSyntax: Sized {
/// Its canonical IRI.
fn iri(self) -> &'static str;
/// Its [IANA media type](https://tools.ietf.org/html/rfc2046).
fn media_type(self) -> &'static str;
/// Its [IANA-registered](https://tools.ietf.org/html/rfc2046) file extension.
fn file_extension(self) -> &'static str;
/// Looks for a known syntax from a media type.
///
/// Example:
/// ```
/// use rudf::{GraphSyntax, FileSyntax};
/// assert_eq!(GraphSyntax::from_mime_type("text/turtle; charset=utf-8"), Some(GraphSyntax::Turtle))
/// ```
fn from_mime_type(media_type: &str) -> Option<Self>;
}
/// [RDF graph](https://www.w3.org/TR/rdf11-concepts/#dfn-graph) serialization formats.
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
pub enum GraphSyntax {
/// [N-Triples](https://www.w3.org/TR/n-triples/)
NTriples,
@ -9,11 +32,85 @@ pub enum GraphSyntax {
RdfXml,
}
impl FileSyntax for GraphSyntax {
fn iri(self) -> &'static str {
match self {
GraphSyntax::NTriples => "http://www.w3.org/ns/formats/N-Triples",
GraphSyntax::Turtle => "http://www.w3.org/ns/formats/Turtle",
GraphSyntax::RdfXml => "http://www.w3.org/ns/formats/RDF_XML",
}
}
fn media_type(self) -> &'static str {
match self {
GraphSyntax::NTriples => "application/n-triples",
GraphSyntax::Turtle => "text/turtle",
GraphSyntax::RdfXml => "application/rdf+xml",
}
}
fn file_extension(self) -> &'static str {
match self {
GraphSyntax::NTriples => "nt",
GraphSyntax::Turtle => "ttl",
GraphSyntax::RdfXml => "rdf",
}
}
fn from_mime_type(media_type: &str) -> Option<Self> {
if let Some(base_type) = media_type.split(';').next() {
match base_type {
"application/n-triples" => Some(GraphSyntax::NTriples),
"text/turtle" => Some(GraphSyntax::Turtle),
"application/rdf+xml" => Some(GraphSyntax::RdfXml),
_ => None,
}
} else {
None
}
}
}
/// [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset) serialization formats.
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
pub enum DatasetSyntax {
/// [N-Quads](https://www.w3.org/TR/n-quads/)
NQuads,
/// [TriG](https://www.w3.org/TR/trig/)
TriG,
}
impl FileSyntax for DatasetSyntax {
fn iri(self) -> &'static str {
match self {
DatasetSyntax::NQuads => "http://www.w3.org/ns/formats/N-Quads",
DatasetSyntax::TriG => "http://www.w3.org/ns/formats/TriG",
}
}
fn media_type(self) -> &'static str {
match self {
DatasetSyntax::NQuads => "application/n-quads",
DatasetSyntax::TriG => "application/trig",
}
}
fn file_extension(self) -> &'static str {
match self {
DatasetSyntax::NQuads => "nq",
DatasetSyntax::TriG => "trig",
}
}
fn from_mime_type(media_type: &str) -> Option<Self> {
if let Some(base_type) = media_type.split(';').next() {
match base_type {
"application/n-quads" => Some(DatasetSyntax::NQuads),
"application/trig" => Some(DatasetSyntax::TriG),
_ => None,
}
} else {
None
}
}
}

@ -12,15 +12,5 @@ edition = "2018"
[dependencies]
rudf = {path = "../lib", features=["rocksdb"]}
gotham = "0.4"
gotham_derive = "0.4"
hyper = "0.12"
futures = "0.1"
serde = "1"
serde_derive = "1"
mime = "0.3"
failure = "0.1"
url = "2"
clap = "2"
tera = "0.11"
lazy_static = "1"
rouille = "3"

@ -1,64 +1,21 @@
use clap::App;
use clap::Arg;
use clap::ArgMatches;
use failure::format_err;
use futures::future;
use futures::Future;
use futures::Stream;
use gotham::handler::{HandlerFuture, IntoHandlerError};
use gotham::helpers::http::response::create_response;
use gotham::middleware::state::StateMiddleware;
use gotham::pipeline::single::single_pipeline;
use gotham::pipeline::single_middleware;
use gotham::router::builder::build_router;
use gotham::router::builder::DefineSingleRoute;
use gotham::router::builder::DrawRoutes;
use gotham::router::Router;
use gotham::start;
use gotham::state::FromState;
use gotham::state::State;
use gotham_derive::*;
use hyper;
use hyper::header::CONTENT_TYPE;
use hyper::Body;
use hyper::HeaderMap;
use hyper::Response;
use hyper::StatusCode;
use lazy_static::lazy_static;
use mime;
use mime::Mime;
use rouille::url::form_urlencoded;
use rouille::{start_server, Request, Response};
use rudf::sparql::QueryResult;
use rudf::sparql::{PreparedQuery, QueryResultSyntax};
use rudf::RepositoryConnection;
use rudf::{GraphSyntax, Repository};
use rudf::{MemoryRepository, RocksDbRepository};
use serde_derive::Deserialize;
use rudf::{
DatasetSyntax, FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection,
RocksDbRepository,
};
use std::fmt::Write;
use std::fs::File;
use std::io::BufReader;
use std::panic::RefUnwindSafe;
use std::str::FromStr;
use std::io::{BufReader, Read};
use std::sync::Arc;
use tera::compile_templates;
use tera::Context;
use tera::Tera;
use url::form_urlencoded;
lazy_static! {
static ref TERA: Tera = {
let mut tera = compile_templates!("templates/**/*");
tera.autoescape_on(vec![]);
tera
};
static ref APPLICATION_SPARQL_QUERY_UTF_8: Mime =
"application/sparql-query; charset=utf-8".parse().unwrap();
static ref APPLICATION_SPARQL_RESULTS_UTF_8: Mime =
"application/sparql-results; charset=utf-8".parse().unwrap();
static ref APPLICATION_N_TRIPLES_UTF_8: Mime =
"application/n-triples; charset=utf-8".parse().unwrap();
}
const HTML_ROOT_PAGE: &str = include_str!("../templates/query.html");
pub fn main() -> Result<(), failure::Error> {
pub fn main() {
let matches = App::new("Rudf SPARQL server")
.arg(
Arg::with_name("bind")
@ -67,12 +24,6 @@ pub fn main() -> Result<(), failure::Error> {
.help("Specify a server socket to bind using the format $(HOST):$(PORT)")
.takes_value(true),
)
.arg(
Arg::with_name("ntriples")
.long("ntriples")
.help("Load a N-Triples file in the server at startup")
.takes_value(true),
)
.arg(
Arg::with_name("file")
.long("file")
@ -84,271 +35,171 @@ pub fn main() -> Result<(), failure::Error> {
let file = matches.value_of("file").map(|v| v.to_string());
if let Some(file) = file {
main_with_dataset(Arc::new(RocksDbRepository::open(file)?), &matches)
main_with_dataset(Arc::new(RocksDbRepository::open(file).unwrap()), &matches)
} else {
main_with_dataset(Arc::new(MemoryRepository::default()), &matches)
}
}
fn main_with_dataset<D: Send + Sync + RefUnwindSafe + 'static>(
dataset: Arc<D>,
matches: &ArgMatches<'_>,
) -> Result<(), failure::Error>
where
for<'a> &'a D: Repository,
{
if let Some(nt_file) = matches.value_of("ntriples") {
println!("Loading NTriples file {}", nt_file);
let connection = dataset.connection()?;
if let Some(nt_file) = matches.value_of("ntriples") {
println!("Loading NTriples file {}", nt_file);
connection.load_graph(
BufReader::new(File::open(nt_file)?),
GraphSyntax::NTriples,
None,
None,
)?;
}
}
let addr = matches.value_of("bind").unwrap_or("127.0.0.1:7878");
println!("Listening for requests at http://{}", addr);
start(addr.to_string(), router(dataset, addr.to_string()));
Ok(())
}
fn router<D: Send + Sync + RefUnwindSafe + 'static>(dataset: Arc<D>, base: String) -> Router
fn main_with_dataset<R: Send + Sync + 'static>(repository: Arc<R>, matches: &ArgMatches)
where
for<'a> &'a D: Repository,
for<'a> &'a R: Repository,
{
let middleware = StateMiddleware::new(GothamState { dataset, base });
let pipeline = single_middleware(middleware);
let (chain, pipelines) = single_pipeline(pipeline);
build_router(chain, pipelines, |route| {
route
.get("/")
.to(|mut state: State| -> (State, Response<Body>) {
let gotham_state: GothamState<D> = GothamState::take_from(&mut state);
let mut context = Context::new();
context.insert("endpoint", &format!("//{}/query", gotham_state.base));
let response = create_response(
&state,
StatusCode::OK,
mime::TEXT_HTML_UTF_8,
TERA.render("query.html", &context).unwrap(),
);
(state, response)
});
route.associate("/query", |assoc| {
assoc
.get()
.with_query_string_extractor::<QueryRequest>()
.to(|mut state: State| -> (State, Response<Body>) {
let parsed_request = QueryRequest::take_from(&mut state);
let response =
evaluate_sparql_query::<D>(&mut state, &parsed_request.query.as_bytes());
(state, response)
});
assoc.post().to(|mut state: State| -> Box<HandlerFuture> {
Box::new(
Body::take_from(&mut state)
.concat2()
.then(|body| match body {
Ok(body) => {
let content_type: Option<Result<Mime, failure::Error>> = HeaderMap::borrow_from(&state)
.get(CONTENT_TYPE)
.map(|content_type| Ok(Mime::from_str(content_type.to_str()?)?));
let response = match content_type {
Some(Ok(content_type)) => match (content_type.type_(), content_type.subtype()) {
(mime::APPLICATION, subtype) if subtype == APPLICATION_SPARQL_QUERY_UTF_8.subtype() => {
evaluate_sparql_query::<D>(
&mut state,
&body.into_bytes(),
)
}
(mime::APPLICATION, mime::WWW_FORM_URLENCODED) => {
match parse_urlencoded_query_request(&body.into_bytes())
{
Ok(parsed_request) => evaluate_sparql_query::<D>(
&mut state,
&parsed_request.query.as_bytes(),
),
Err(error) => error_to_response(
&state,
&error,
StatusCode::BAD_REQUEST,
),
}
}
_ => error_to_response(
&state,
&format_err!("Unsupported Content-Type: {:?}", content_type),
StatusCode::BAD_REQUEST,
)
}
Some(Err(error)) => error_to_response(
&state,
&format_err!("The request contains an invalid Content-Type header: {}", error),
StatusCode::BAD_REQUEST,
),
None => error_to_response(
&state,
&format_err!("The request should contain a Content-Type header"),
StatusCode::BAD_REQUEST,
),
};
future::ok((state, response))
}
Err(e) => future::err((state, e.into_handler_error())),
}),
)
});
})
let addr = matches
.value_of("bind")
.unwrap_or("127.0.0.1:7878")
.to_owned();
println!("Listening for requests at http://{}", &addr);
start_server(addr.to_string(), move |request| {
handle_request(request, repository.connection().unwrap(), &addr)
})
}
#[derive(StateData)]
struct GothamState<D: Send + Sync + RefUnwindSafe + 'static>
where
for<'a> &'a D: Repository,
{
dataset: Arc<D>,
base: String,
}
impl<D: Send + Sync + RefUnwindSafe + 'static> Clone for GothamState<D>
where
for<'a> &'a D: Repository,
{
fn clone(&self) -> Self {
Self {
dataset: self.dataset.clone(),
base: self.base.clone(),
fn handle_request<R: RepositoryConnection>(
request: &Request,
connection: R,
host: &str,
) -> Response {
match (request.url().as_str(), request.method()) {
("/", "GET") => {
Response::html(HTML_ROOT_PAGE.replace("{{endpoint}}", &format!("//{}/query", host)))
}
("/", "POST") => {
if let Some(body) = request.data() {
if let Some(content_type) = request.header("Content-Type") {
match if let Some(format) = GraphSyntax::from_mime_type(content_type) {
connection.load_graph(BufReader::new(body), format, None, None)
} else if let Some(format) = DatasetSyntax::from_mime_type(content_type) {
connection.load_dataset(BufReader::new(body), format, None)
} else {
return Response::text(format!(
"No supported content Content-Type given: {}",
content_type
))
.with_status_code(415);
} {
Ok(()) => Response::empty_204(),
Err(error) => Response::text(error.to_string()).with_status_code(400),
}
} else {
Response::text("No Content-Type given").with_status_code(400)
}
} else {
Response::text("No content given").with_status_code(400)
}
}
("/query", "GET") => {
evaluate_urlencoded_sparql_query(connection, request.raw_query_string().as_bytes())
}
("/query", "POST") => {
if let Some(mut body) = request.data() {
if let Some(content_type) = request.header("Content-Type") {
if content_type.starts_with("application/sparql-query") {
evaluate_sparql_query(connection, body)
} else if content_type.starts_with("application/x-www-form-urlencoded") {
let mut buffer = Vec::default();
body.read_to_end(&mut buffer).unwrap();
evaluate_urlencoded_sparql_query(connection, &buffer)
} else {
Response::text(format!(
"No supported content Content-Type given: {}",
content_type
))
.with_status_code(415)
}
} else {
Response::text("No Content-Type given").with_status_code(400)
}
} else {
Response::text("No content given").with_status_code(400)
}
}
_ => Response::empty_404(),
}
}
#[derive(Deserialize, StateData, StaticResponseExtender)]
struct QueryRequest {
query: String,
}
fn parse_urlencoded_query_request(query: &[u8]) -> Result<QueryRequest, failure::Error> {
form_urlencoded::parse(query)
.find(|(key, _)| key == "query")
.map(|(_, value)| QueryRequest {
query: value.to_string(),
})
.ok_or_else(|| format_err!("'query' parameter not found"))
fn evaluate_urlencoded_sparql_query<R: RepositoryConnection>(
connection: R,
encoded: &[u8],
) -> Response {
if let Some((_, query)) = form_urlencoded::parse(encoded).find(|(k, _)| k == "query") {
evaluate_sparql_query(connection, query.as_bytes())
} else {
Response::text("You should set the 'query' parameter").with_status_code(400)
}
}
fn evaluate_sparql_query<D: Send + Sync + RefUnwindSafe + 'static>(
state: &mut State,
query: &[u8],
) -> Response<Body>
where
for<'a> &'a D: Repository,
{
let gotham_state: GothamState<D> = GothamState::take_from(state);
let connection = match gotham_state.dataset.connection() {
Ok(connection) => connection,
Err(error) => return error_to_response(&state, &error, StatusCode::INTERNAL_SERVER_ERROR),
};
let result = match connection.prepare_query(query) {
Ok(query) => match query.exec() {
Ok(QueryResult::Graph(triples)) => {
fn evaluate_sparql_query<R: RepositoryConnection>(connection: R, query: impl Read) -> Response {
//TODO: stream
match connection.prepare_query(query) {
Ok(query) => match query.exec().unwrap() {
QueryResult::Graph(triples) => {
let mut result = String::default();
for triple in triples {
match triple {
Ok(triple) => writeln!(&mut result, "{}", triple).unwrap(),
Err(error) => {
return error_to_response(
&state,
&error,
StatusCode::INTERNAL_SERVER_ERROR,
);
}
}
writeln!(&mut result, "{}", triple.unwrap()).unwrap()
}
create_response(
&state,
StatusCode::OK,
APPLICATION_N_TRIPLES_UTF_8.clone(),
result,
)
Response::from_data(GraphSyntax::NTriples.media_type(), result.into_bytes())
}
Ok(result) => create_response(
&state,
StatusCode::OK,
APPLICATION_SPARQL_RESULTS_UTF_8.clone(),
result => Response::from_data(
"application/sparql-results",
result
.write(Vec::default(), QueryResultSyntax::Xml)
.unwrap(),
),
Err(error) => error_to_response(&state, &error, StatusCode::INTERNAL_SERVER_ERROR),
},
Err(error) => error_to_response(&state, &error, StatusCode::BAD_REQUEST),
};
result
}
fn error_to_response(state: &State, error: &failure::Error, code: StatusCode) -> Response<Body> {
create_response(state, code, mime::TEXT_PLAIN_UTF_8, error.to_string())
Err(error) => Response::text(error.to_string()).with_status_code(400),
}
}
#[cfg(test)]
mod tests {
use super::*;
use gotham::test::TestServer;
use mime::Mime;
use std::str::FromStr;
use crate::handle_request;
use rouille::Request;
use rudf::{MemoryRepository, Repository};
use std::io::Read;
#[test]
fn get_ui() {
let test_server = TestServer::new(router(
Arc::new(MemoryRepository::default()),
"".to_string(),
))
.unwrap();
let response = test_server
.client()
.get("http://localhost/")
.perform()
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
exec(Request::fake_http("GET", "/", vec![], vec![]))
}
#[test]
fn get_query() {
let test_server = TestServer::new(router(
Arc::new(MemoryRepository::default()),
"".to_string(),
exec(Request::fake_http(
"GET",
"/query?query=SELECT+*+WHERE+{+?s+?p+?o+}",
vec![(
"Content-Type".to_string(),
"application/sparql-query".to_string(),
)],
b"SELECT * WHERE { ?s ?p ?o }".to_vec(),
))
.unwrap();
let response = test_server
.client()
.get("http://localhost/query?query=SELECT+*+WHERE+{+?s+?p+?o+}")
.perform()
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn post_query() {
let test_server = TestServer::new(router(
Arc::new(MemoryRepository::default()),
"".to_string(),
exec(Request::fake_http(
"POST",
"/query",
vec![(
"Content-Type".to_string(),
"application/sparql-query".to_string(),
)],
b"SELECT * WHERE { ?s ?p ?o }".to_vec(),
))
.unwrap();
let response = test_server
.client()
.post(
"http://localhost/query",
"SELECT * WHERE { ?s ?p ?o }",
Mime::from_str("application/sparql-query").unwrap(),
)
.perform()
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
fn exec(request: Request) {
let response = handle_request(
&request,
MemoryRepository::default().connection().unwrap(),
"localhost",
);
let mut body = String::default();
request
.data()
.map(|mut r| r.read_to_string(&mut body).unwrap());
assert_eq!(response.status_code, 200, "{}", body);
}
}

Loading…
Cancel
Save