From bf35eec82f33e341bb320eadd061e1c596da0a8b Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 22 Apr 2020 20:53:17 +0200 Subject: [PATCH] Uses async_h1 in Oxigraph server --- bench/bsbm_oxigraph.sh | 10 +- server/Cargo.toml | 7 +- server/src/main.rs | 537 +++++++++++++++++++++++++++++------------ 3 files changed, 387 insertions(+), 167 deletions(-) diff --git a/bench/bsbm_oxigraph.sh b/bench/bsbm_oxigraph.sh index 0284073b..a2bec3f9 100755 --- a/bench/bsbm_oxigraph.sh +++ b/bench/bsbm_oxigraph.sh @@ -1,19 +1,19 @@ #!/usr/bin/env bash DATASET_SIZE=100000 # number of products in the dataset. There is around 350 triples generated by product. -MEMORY_SIZE=1000000 # availlable memory for Oxigraph in GB. Useful to simulate low RAM machines. +MEMORY_SIZE=1000000 # available memory for Oxigraph in KB. Useful to simulate low RAM machines. cd bsbm-tools ./generate -fc -pc ${DATASET_SIZE} -s nt -fn "explore-${DATASET_SIZE}" cargo build --release --manifest-path="../../server/Cargo.toml" ( ulimit -d ${MEMORY_SIZE} - ./../../target/release/oxigraph_server --file oxigraph_data + ./../../target/release/oxigraph_server --file oxigraph_data --bind 127.0.0.1:7878 ) & sleep 5 -curl -f -X POST -H 'Content-Type:application/n-triples' --data-binary "@explore-${DATASET_SIZE}.nt" http://localhost:7878/ -./testdriver -ucf usecases/explore/sparql.txt -o "../bsbm.explore.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" http://localhost:7878/query -./testdriver -ucf usecases/businessIntelligence/sparql.txt -o "../bsbm.businessIntelligence.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" http://localhost:7878/query +curl -f -X POST -H 'Content-Type:application/n-triples' --data-binary "@explore-${DATASET_SIZE}.nt" http://127.0.0.1:7878/ +./testdriver -ucf usecases/explore/sparql.txt -o "../bsbm.explore.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" http://127.0.0.1:7878/query +./testdriver -ucf usecases/businessIntelligence/sparql.txt -o "../bsbm.businessIntelligence.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" "http://127.0.0.1:7878/query" kill $! rm -r oxigraph_data rm "explore-${DATASET_SIZE}.nt" diff --git a/server/Cargo.toml b/server/Cargo.toml index 457f8418..6dc56705 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,6 +11,9 @@ SPARQL server based on Oxigraph edition = "2018" [dependencies] -oxigraph = {path = "../lib", features = ["rocksdb"] } +oxigraph = { path = "../lib", features = ["rocksdb"] } +async-std = { version = "1", features = ["attributes"] } +async-h1 = "1" clap = "2" -rouille = "3" \ No newline at end of file +http-types = "1" +url = "2" \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 67d8597b..b01c1699 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -9,25 +9,31 @@ unused_qualifications )] +use async_std::future::Future; +use async_std::io::{BufRead, Read}; +use async_std::net::{TcpListener, TcpStream}; +use async_std::prelude::*; +use async_std::sync::Arc; +use async_std::task::{block_on, spawn, spawn_blocking}; use clap::App; use clap::Arg; use clap::ArgMatches; +use http_types::headers::HeaderName; +use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; use oxigraph::{ DatasetSyntax, FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection, RocksDbRepository, }; -use rouille::input::priority_header_preferred; -use rouille::url::form_urlencoded; -use rouille::{content_encoding, start_server, Request, Response}; -use std::io::{BufReader, Read}; -use std::sync::Arc; +use std::str::FromStr; +use url::form_urlencoded; const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576; const HTML_ROOT_PAGE: &str = include_str!("../templates/query.html"); const SERVER: &str = concat!("Oxigraph/", env!("CARGO_PKG_VERSION")); -pub fn main() { +#[async_std::main] +pub async fn main() -> Result<()> { let matches = App::new("Oxigraph SPARQL server") .arg( Arg::with_name("bind") @@ -48,217 +54,428 @@ pub fn main() { let file = matches.value_of("file").map(|v| v.to_string()); if let Some(file) = file { - main_with_dataset(Arc::new(RocksDbRepository::open(file).unwrap()), &matches) + main_with_dataset(Arc::new(RocksDbRepository::open(file)?), &matches).await } else { - main_with_dataset(Arc::new(MemoryRepository::default()), &matches) + main_with_dataset(Arc::new(MemoryRepository::default()), &matches).await } } -fn main_with_dataset(repository: Arc, matches: &ArgMatches<'_>) +async fn main_with_dataset( + repository: Arc, + matches: &ArgMatches<'_>, +) -> Result<()> where for<'a> &'a R: Repository, { let addr = matches.value_of("bind").unwrap().to_owned(); println!("Listening for requests at http://{}", &addr); - start_server(addr, move |request| { - content_encoding::apply( - request, - handle_request(request, repository.connection().unwrap()), - ) - .with_unique_header("Server", SERVER) + http_server(addr, move |request| { + handle_request(request, Arc::clone(&repository)) }) + .await } -fn handle_request(request: &Request, mut connection: R) -> Response { - match (request.url().as_str(), request.method()) { - ("/", "GET") => Response::html(HTML_ROOT_PAGE), - ("/", "POST") => { - if let Some(body) = request.data() { - if let Some(content_type) = request.header("Content-Type") { - match if let Some(format) = GraphSyntax::from_mime_type(content_type) { - connection.load_graph(BufReader::new(body), format, None, None) - } else if let Some(format) = DatasetSyntax::from_mime_type(content_type) { - connection.load_dataset(BufReader::new(body), format, None) - } else { - return Response::text(format!( - "No supported content Content-Type given: {}", - content_type - )) - .with_status_code(415); - } { - Ok(()) => Response::empty_204(), - Err(error) => Response::text(error.to_string()).with_status_code(400), - } +async fn handle_request( + request: Request, + repository: Arc, +) -> Result +where + for<'a> &'a R: Repository, +{ + let mut response = match (request.url().path(), request.method()) { + ("/", Method::Get) => { + let mut response = Response::new(StatusCode::Ok); + response.append_header(headers::CONTENT_TYPE, "text/html")?; + response.set_body(HTML_ROOT_PAGE); + response + } + ("/", Method::Post) => { + if let Some(content_type) = request.content_type() { + match if let Some(format) = GraphSyntax::from_mime_type(essence(&content_type)) { + spawn_blocking(move || { + repository.connection()?.load_graph( + SyncAsyncBufReader::from(request), + format, + None, + None, + ) + }) + } else if let Some(format) = DatasetSyntax::from_mime_type(essence(&content_type)) { + spawn_blocking(move || { + repository.connection()?.load_dataset( + SyncAsyncBufReader::from(request), + format, + None, + ) + }) } else { - Response::text("No Content-Type given").with_status_code(400) + return Ok(simple_response( + StatusCode::UnsupportedMediaType, + format!("No supported content Content-Type given: {}", content_type), + )); + } + .await + { + Ok(()) => Response::new(StatusCode::NoContent), + Err(error) => { + let mut error = Error::from(error); + error.set_status(StatusCode::BadRequest); + return Err(error); + } } } else { - Response::text("No content given").with_status_code(400) + simple_response(StatusCode::BadRequest, "No Content-Type given") } } - ("/query", "GET") => evaluate_urlencoded_sparql_query( - connection, - request.raw_query_string().as_bytes(), - request, - ), - ("/query", "POST") => { - if let Some(body) = request.data() { - if let Some(content_type) = request.header("Content-Type") { - if content_type.starts_with("application/sparql-query") { - let mut buffer = String::default(); - body.take(MAX_SPARQL_BODY_SIZE) - .read_to_string(&mut buffer) - .unwrap(); - evaluate_sparql_query(connection, &buffer, request) - } else if content_type.starts_with("application/x-www-form-urlencoded") { - let mut buffer = Vec::default(); - body.take(MAX_SPARQL_BODY_SIZE) - .read_to_end(&mut buffer) - .unwrap(); - evaluate_urlencoded_sparql_query(connection, &buffer, request) - } else { - Response::text(format!( - "No supported content Content-Type given: {}", - content_type - )) - .with_status_code(415) - } + ("/query", Method::Get) => { + evaluate_urlencoded_sparql_query( + repository, + request.url().query().unwrap_or("").as_bytes().to_vec(), + request, + ) + .await? + } + ("/query", Method::Post) => { + if let Some(content_type) = request.content_type() { + if essence(&content_type) == "application/sparql-query" { + let mut buffer = String::new(); + let mut request = request; + request + .take_body() + .take(MAX_SPARQL_BODY_SIZE) + .read_to_string(&mut buffer) + .await?; + evaluate_sparql_query(repository, buffer, request).await? + } else if essence(&content_type) == "application/x-www-form-urlencoded" { + let mut buffer = Vec::new(); + let mut request = request; + request + .take_body() + .take(MAX_SPARQL_BODY_SIZE) + .read_to_end(&mut buffer) + .await?; + evaluate_urlencoded_sparql_query(repository, buffer, request).await? } else { - Response::text("No Content-Type given").with_status_code(400) + simple_response( + StatusCode::UnsupportedMediaType, + format!("No supported Content-Type given: {}", content_type), + ) } } else { - Response::text("No content given").with_status_code(400) + simple_response(StatusCode::BadRequest, "No Content-Type given") } } - _ => Response::empty_404(), - } + _ => Response::new(StatusCode::NotFound), + }; + response.append_header("Server", SERVER)?; + Ok(response) } -fn evaluate_urlencoded_sparql_query( - connection: R, - encoded: &[u8], - request: &Request, -) -> Response { - if let Some((_, query)) = form_urlencoded::parse(encoded).find(|(k, _)| k == "query") { - evaluate_sparql_query(connection, &query, request) +/// TODO: bad hack to overcome http_types limitations +fn essence(mime: &Mime) -> &str { + mime.essence().split(';').next().unwrap_or("") +} + +fn simple_response(status: StatusCode, body: impl Into) -> Response { + let mut response = Response::new(status); + response.set_body(body); + response +} + +async fn evaluate_urlencoded_sparql_query( + repository: Arc, + encoded: Vec, + request: Request, +) -> Result +where + for<'a> &'a R: Repository, +{ + if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") { + evaluate_sparql_query(repository, query.to_string(), request).await } else { - Response::text("You should set the 'query' parameter").with_status_code(400) + Ok(simple_response( + StatusCode::BadRequest, + "You should set the 'query' parameter", + )) } } -fn evaluate_sparql_query( - connection: R, - query: &str, - request: &Request, -) -> Response { - //TODO: stream - match connection.prepare_query(query, QueryOptions::default()) { - Ok(query) => { - let results = query.exec().unwrap(); - if let QueryResult::Graph(_) = results { - let supported_formats = [ +async fn evaluate_sparql_query( + repository: Arc, + query: String, + request: Request, +) -> Result +where + for<'a> &'a R: Repository, +{ + spawn_blocking(move || { + //TODO: stream + let query = repository + .connection()? + .prepare_query(&query, QueryOptions::default()) + .map_err(|e| { + let mut e = Error::from(e); + e.set_status(StatusCode::BadRequest); + e + })?; + let results = query.exec()?; + if let QueryResult::Graph(_) = results { + let format = content_negotiation( + request, + &[ GraphSyntax::NTriples.media_type(), GraphSyntax::Turtle.media_type(), GraphSyntax::RdfXml.media_type(), - ]; - let format = if let Some(accept) = request.header("Accept") { - if let Some(media_type) = - priority_header_preferred(accept, supported_formats.iter().cloned()) - .and_then(|p| GraphSyntax::from_mime_type(supported_formats[p])) - { - media_type - } else { - return Response::text(format!( - "No supported Accept given: {}. Supported format: {:?}", - accept, supported_formats - )) - .with_status_code(415); - } - } else { - GraphSyntax::NTriples - }; + ], + )?; - Response::from_data( - format.media_type(), - results.write_graph(Vec::default(), format).unwrap(), - ) - } else { - let supported_formats = [ + let mut response = Response::from(results.write_graph(Vec::default(), format)?); + response.insert_header(headers::CONTENT_TYPE, format.media_type())?; + Ok(response) + } else { + let format = content_negotiation( + request, + &[ QueryResultSyntax::Xml.media_type(), QueryResultSyntax::Json.media_type(), - ]; - let format = if let Some(accept) = request.header("Accept") { - if let Some(media_type) = - priority_header_preferred(accept, supported_formats.iter().cloned()) - .and_then(|p| QueryResultSyntax::from_mime_type(supported_formats[p])) - { - media_type - } else { - return Response::text(format!( - "No supported Accept given: {}. Supported format: {:?}", - accept, supported_formats - )) - .with_status_code(415); - } - } else { - QueryResultSyntax::Json - }; + ], + )?; + let mut response = Response::from(results.write(Vec::default(), format)?); + response.insert_header(headers::CONTENT_TYPE, format.media_type())?; + Ok(response) + } + }) + .await +} - Response::from_data( - format.media_type(), - results.write(Vec::default(), format).unwrap(), - ) +async fn http_server< + F: Clone + Send + Sync + 'static + Fn(Request) -> Fut, + Fut: Send + Future>, +>( + host: String, + handle: F, +) -> Result<()> { + async fn accept Fut, Fut: Future>>( + addr: String, + stream: TcpStream, + handle: F, + ) -> Result<()> { + async_h1::accept(&addr, stream, |request| async { + Ok(match handle(request).await { + Ok(result) => result, + Err(error) => simple_response(error.status(), error.to_string()), + }) + }) + .await + } + + let listener = TcpListener::bind(&host).await?; + let mut incoming = listener.incoming(); + while let Some(stream) = incoming.next().await { + let stream = stream?.clone(); //TODO: clone stream? + let handle = handle.clone(); + let addr = format!("http://{}", host); + spawn(async { + if let Err(err) = accept(addr, stream, handle).await { + eprintln!("{}", err); + }; + }); + } + Ok(()) +} + +fn content_negotiation(request: Request, supported: &[&str]) -> Result { + let header = request + .header(&HeaderName::from_str("Accept").unwrap()) + .and_then(|h| h.last()) + .map(|h| h.as_str().trim()) + .unwrap_or(""); + let supported: Vec = supported + .iter() + .map(|h| Mime::from_str(h).unwrap()) + .collect(); + + let mut result = supported.first().unwrap(); + let mut result_score = 0f32; + + if !header.is_empty() { + for possible in header.split(',') { + let possible = Mime::from_str(possible.trim())?; + let score = if let Some(q) = possible.param("q") { + f32::from_str(q)? + } else { + 1. + }; + if score <= result_score { + continue; + } + for candidate in &supported { + if (possible.basetype() == candidate.basetype() || possible.basetype() == "*") + && (possible.subtype() == candidate.subtype() || possible.subtype() == "*") + { + result = candidate; + result_score = score; + break; + } } } - Err(error) => Response::text(error.to_string()).with_status_code(400), + } + + F::from_mime_type(essence(result)) + .ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) +} + +struct SyncAsyncBufReader { + inner: R, +} + +impl From for SyncAsyncBufReader { + fn from(inner: R) -> Self { + Self { inner } + } +} + +impl std::io::Read for SyncAsyncBufReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + block_on(self.inner.read(buf)) + } + + //TODO: implement other methods +} + +impl std::io::BufRead for SyncAsyncBufReader { + fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + unimplemented!() + } + + fn consume(&mut self, amt: usize) { + unimplemented!() + } + + fn read_until(&mut self, byte: u8, buf: &mut Vec) -> std::io::Result { + block_on(self.inner.read_until(byte, buf)) + } + + fn read_line(&mut self, buf: &mut String) -> std::io::Result { + block_on(self.inner.read_line(buf)) } } #[cfg(test)] mod tests { use crate::handle_request; - use oxigraph::{MemoryRepository, Repository}; - use rouille::Request; - use std::io::Read; + use async_std::sync::Arc; + use async_std::task::block_on; + use http_types::{Method, Request, StatusCode, Url}; + use oxigraph::MemoryRepository; #[test] fn get_ui() { - exec(Request::fake_http("GET", "/", vec![], vec![])) + exec( + Request::new(Method::Get, Url::parse("http://localhost/").unwrap()), + StatusCode::Ok, + ) + } + + #[test] + fn post_file() { + let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap()); + request + .insert_header("Content-Type", "text/turtle") + .unwrap(); + request.set_body(" ."); + exec(request, StatusCode::NoContent) + } + + #[test] + fn post_wrong_file() { + let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap()); + request + .insert_header("Content-Type", "text/turtle") + .unwrap(); + request.set_body(""); + exec(request, StatusCode::BadRequest) + } + + #[test] + fn post_unsupported_file() { + let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap()); + request.insert_header("Content-Type", "text/plain").unwrap(); + exec(request, StatusCode::UnsupportedMediaType) } #[test] fn get_query() { - exec(Request::fake_http( - "GET", - "/query?query=SELECT+*+WHERE+{+?s+?p+?o+}", - vec![( - "Content-Type".to_string(), - "application/sparql-query".to_string(), - )], - b"SELECT * WHERE { ?s ?p ?o }".to_vec(), - )) + exec( + Request::new( + Method::Get, + Url::parse( + "http://localhost/query?query=SELECT%20*%20WHERE%20{%20?s%20?p%20?o%20}", + ) + .unwrap(), + ), + StatusCode::Ok, + ); + } + + #[test] + fn get_bad_query() { + exec( + Request::new( + Method::Get, + Url::parse("http://localhost/query?query=SELECT").unwrap(), + ), + StatusCode::BadRequest, + ); + } + + #[test] + fn get_without_query() { + exec( + Request::new(Method::Get, Url::parse("http://localhost/query").unwrap()), + StatusCode::BadRequest, + ); } #[test] fn post_query() { - exec(Request::fake_http( - "POST", - "/query", - vec![( - "Content-Type".to_string(), - "application/sparql-query".to_string(), - )], - b"SELECT * WHERE { ?s ?p ?o }".to_vec(), - )) + let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap()); + request + .insert_header("Content-Type", "application/sparql-query") + .unwrap(); + request.set_body("SELECT * WHERE { ?s ?p ?o }"); + exec(request, StatusCode::Ok) } - fn exec(request: Request) { - let response = handle_request(&request, MemoryRepository::default().connection().unwrap()); - let mut body = String::default(); + #[test] + fn post_bad_query() { + let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap()); request - .data() - .map(|mut r| r.read_to_string(&mut body).unwrap()); - assert_eq!(response.status_code, 200, "{}", body); + .insert_header("Content-Type", "application/sparql-query") + .unwrap(); + request.set_body("SELECT"); + exec(request, StatusCode::BadRequest) + } + + #[test] + fn post_unknown_query() { + let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap()); + request + .insert_header("Content-Type", "application/sparql-todo") + .unwrap(); + request.set_body("SELECT"); + exec(request, StatusCode::UnsupportedMediaType) + } + + fn exec(request: Request, expected_status: StatusCode) { + let repository = Arc::new(MemoryRepository::default()); + assert_eq!( + match block_on(handle_request(request, Arc::clone(&repository))) { + Ok(r) => r.status(), + Err(e) => e.status(), + }, + expected_status + ); } }