Uses async_h1 in Oxigraph server

pull/35/head
Tpt 5 years ago
parent 3751286734
commit bf35eec82f
  1. 10
      bench/bsbm_oxigraph.sh
  2. 5
      server/Cargo.toml
  3. 513
      server/src/main.rs

@ -1,19 +1,19 @@
#!/usr/bin/env bash #!/usr/bin/env bash
DATASET_SIZE=100000 # number of products in the dataset. There is around 350 triples generated by product. DATASET_SIZE=100000 # number of products in the dataset. There is around 350 triples generated by product.
MEMORY_SIZE=1000000 # availlable memory for Oxigraph in GB. Useful to simulate low RAM machines. MEMORY_SIZE=1000000 # available memory for Oxigraph in KB. Useful to simulate low RAM machines.
cd bsbm-tools cd bsbm-tools
./generate -fc -pc ${DATASET_SIZE} -s nt -fn "explore-${DATASET_SIZE}" ./generate -fc -pc ${DATASET_SIZE} -s nt -fn "explore-${DATASET_SIZE}"
cargo build --release --manifest-path="../../server/Cargo.toml" cargo build --release --manifest-path="../../server/Cargo.toml"
( (
ulimit -d ${MEMORY_SIZE} ulimit -d ${MEMORY_SIZE}
./../../target/release/oxigraph_server --file oxigraph_data ./../../target/release/oxigraph_server --file oxigraph_data --bind 127.0.0.1:7878
) & ) &
sleep 5 sleep 5
curl -f -X POST -H 'Content-Type:application/n-triples' --data-binary "@explore-${DATASET_SIZE}.nt" http://localhost:7878/ curl -f -X POST -H 'Content-Type:application/n-triples' --data-binary "@explore-${DATASET_SIZE}.nt" http://127.0.0.1:7878/
./testdriver -ucf usecases/explore/sparql.txt -o "../bsbm.explore.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" http://localhost:7878/query ./testdriver -ucf usecases/explore/sparql.txt -o "../bsbm.explore.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" http://127.0.0.1:7878/query
./testdriver -ucf usecases/businessIntelligence/sparql.txt -o "../bsbm.businessIntelligence.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" http://localhost:7878/query ./testdriver -ucf usecases/businessIntelligence/sparql.txt -o "../bsbm.businessIntelligence.oxigraph.${DATASET_SIZE}.${MEMORY_SIZE}.$(date +'%Y-%m-%d').xml" "http://127.0.0.1:7878/query"
kill $! kill $!
rm -r oxigraph_data rm -r oxigraph_data
rm "explore-${DATASET_SIZE}.nt" rm "explore-${DATASET_SIZE}.nt"

@ -12,5 +12,8 @@ edition = "2018"
[dependencies] [dependencies]
oxigraph = { path = "../lib", features = ["rocksdb"] } oxigraph = { path = "../lib", features = ["rocksdb"] }
async-std = { version = "1", features = ["attributes"] }
async-h1 = "1"
clap = "2" clap = "2"
rouille = "3" http-types = "1"
url = "2"

@ -9,25 +9,31 @@
unused_qualifications unused_qualifications
)] )]
use async_std::future::Future;
use async_std::io::{BufRead, Read};
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::task::{block_on, spawn, spawn_blocking};
use clap::App; use clap::App;
use clap::Arg; use clap::Arg;
use clap::ArgMatches; use clap::ArgMatches;
use http_types::headers::HeaderName;
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax};
use oxigraph::{ use oxigraph::{
DatasetSyntax, FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection, DatasetSyntax, FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection,
RocksDbRepository, RocksDbRepository,
}; };
use rouille::input::priority_header_preferred; use std::str::FromStr;
use rouille::url::form_urlencoded; use url::form_urlencoded;
use rouille::{content_encoding, start_server, Request, Response};
use std::io::{BufReader, Read};
use std::sync::Arc;
const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576; const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576;
const HTML_ROOT_PAGE: &str = include_str!("../templates/query.html"); const HTML_ROOT_PAGE: &str = include_str!("../templates/query.html");
const SERVER: &str = concat!("Oxigraph/", env!("CARGO_PKG_VERSION")); const SERVER: &str = concat!("Oxigraph/", env!("CARGO_PKG_VERSION"));
pub fn main() { #[async_std::main]
pub async fn main() -> Result<()> {
let matches = App::new("Oxigraph SPARQL server") let matches = App::new("Oxigraph SPARQL server")
.arg( .arg(
Arg::with_name("bind") Arg::with_name("bind")
@ -48,217 +54,428 @@ pub fn main() {
let file = matches.value_of("file").map(|v| v.to_string()); let file = matches.value_of("file").map(|v| v.to_string());
if let Some(file) = file { if let Some(file) = file {
main_with_dataset(Arc::new(RocksDbRepository::open(file).unwrap()), &matches) main_with_dataset(Arc::new(RocksDbRepository::open(file)?), &matches).await
} else { } else {
main_with_dataset(Arc::new(MemoryRepository::default()), &matches) main_with_dataset(Arc::new(MemoryRepository::default()), &matches).await
} }
} }
fn main_with_dataset<R: Send + Sync + 'static>(repository: Arc<R>, matches: &ArgMatches<'_>) async fn main_with_dataset<R: Send + Sync + 'static>(
repository: Arc<R>,
matches: &ArgMatches<'_>,
) -> Result<()>
where where
for<'a> &'a R: Repository, for<'a> &'a R: Repository,
{ {
let addr = matches.value_of("bind").unwrap().to_owned(); let addr = matches.value_of("bind").unwrap().to_owned();
println!("Listening for requests at http://{}", &addr); println!("Listening for requests at http://{}", &addr);
start_server(addr, move |request| { http_server(addr, move |request| {
content_encoding::apply( handle_request(request, Arc::clone(&repository))
request,
handle_request(request, repository.connection().unwrap()),
)
.with_unique_header("Server", SERVER)
}) })
.await
} }
fn handle_request<R: RepositoryConnection>(request: &Request, mut connection: R) -> Response { async fn handle_request<R: Send + Sync + 'static>(
match (request.url().as_str(), request.method()) { request: Request,
("/", "GET") => Response::html(HTML_ROOT_PAGE), repository: Arc<R>,
("/", "POST") => { ) -> Result<Response>
if let Some(body) = request.data() { where
if let Some(content_type) = request.header("Content-Type") { for<'a> &'a R: Repository,
match if let Some(format) = GraphSyntax::from_mime_type(content_type) { {
connection.load_graph(BufReader::new(body), format, None, None) let mut response = match (request.url().path(), request.method()) {
} else if let Some(format) = DatasetSyntax::from_mime_type(content_type) { ("/", Method::Get) => {
connection.load_dataset(BufReader::new(body), format, None) let mut response = Response::new(StatusCode::Ok);
} else { response.append_header(headers::CONTENT_TYPE, "text/html")?;
return Response::text(format!( response.set_body(HTML_ROOT_PAGE);
"No supported content Content-Type given: {}", response
content_type
))
.with_status_code(415);
} {
Ok(()) => Response::empty_204(),
Err(error) => Response::text(error.to_string()).with_status_code(400),
} }
("/", Method::Post) => {
if let Some(content_type) = request.content_type() {
match if let Some(format) = GraphSyntax::from_mime_type(essence(&content_type)) {
spawn_blocking(move || {
repository.connection()?.load_graph(
SyncAsyncBufReader::from(request),
format,
None,
None,
)
})
} else if let Some(format) = DatasetSyntax::from_mime_type(essence(&content_type)) {
spawn_blocking(move || {
repository.connection()?.load_dataset(
SyncAsyncBufReader::from(request),
format,
None,
)
})
} else { } else {
Response::text("No Content-Type given").with_status_code(400) return Ok(simple_response(
StatusCode::UnsupportedMediaType,
format!("No supported content Content-Type given: {}", content_type),
));
}
.await
{
Ok(()) => Response::new(StatusCode::NoContent),
Err(error) => {
let mut error = Error::from(error);
error.set_status(StatusCode::BadRequest);
return Err(error);
}
} }
} else { } else {
Response::text("No content given").with_status_code(400) simple_response(StatusCode::BadRequest, "No Content-Type given")
} }
} }
("/query", "GET") => evaluate_urlencoded_sparql_query( ("/query", Method::Get) => {
connection, evaluate_urlencoded_sparql_query(
request.raw_query_string().as_bytes(), repository,
request.url().query().unwrap_or("").as_bytes().to_vec(),
request, request,
), )
("/query", "POST") => { .await?
if let Some(body) = request.data() { }
if let Some(content_type) = request.header("Content-Type") { ("/query", Method::Post) => {
if content_type.starts_with("application/sparql-query") { if let Some(content_type) = request.content_type() {
let mut buffer = String::default(); if essence(&content_type) == "application/sparql-query" {
body.take(MAX_SPARQL_BODY_SIZE) let mut buffer = String::new();
let mut request = request;
request
.take_body()
.take(MAX_SPARQL_BODY_SIZE)
.read_to_string(&mut buffer) .read_to_string(&mut buffer)
.unwrap(); .await?;
evaluate_sparql_query(connection, &buffer, request) evaluate_sparql_query(repository, buffer, request).await?
} else if content_type.starts_with("application/x-www-form-urlencoded") { } else if essence(&content_type) == "application/x-www-form-urlencoded" {
let mut buffer = Vec::default(); let mut buffer = Vec::new();
body.take(MAX_SPARQL_BODY_SIZE) let mut request = request;
request
.take_body()
.take(MAX_SPARQL_BODY_SIZE)
.read_to_end(&mut buffer) .read_to_end(&mut buffer)
.unwrap(); .await?;
evaluate_urlencoded_sparql_query(connection, &buffer, request) evaluate_urlencoded_sparql_query(repository, buffer, request).await?
} else { } else {
Response::text(format!( simple_response(
"No supported content Content-Type given: {}", StatusCode::UnsupportedMediaType,
content_type format!("No supported Content-Type given: {}", content_type),
)) )
.with_status_code(415)
} }
} else { } else {
Response::text("No Content-Type given").with_status_code(400) simple_response(StatusCode::BadRequest, "No Content-Type given")
} }
} else {
Response::text("No content given").with_status_code(400)
} }
_ => Response::new(StatusCode::NotFound),
};
response.append_header("Server", SERVER)?;
Ok(response)
} }
_ => Response::empty_404(),
/// TODO: bad hack to overcome http_types limitations
fn essence(mime: &Mime) -> &str {
mime.essence().split(';').next().unwrap_or("")
} }
fn simple_response(status: StatusCode, body: impl Into<Body>) -> Response {
let mut response = Response::new(status);
response.set_body(body);
response
} }
fn evaluate_urlencoded_sparql_query<R: RepositoryConnection>( async fn evaluate_urlencoded_sparql_query<R: Send + Sync + 'static>(
connection: R, repository: Arc<R>,
encoded: &[u8], encoded: Vec<u8>,
request: &Request, request: Request,
) -> Response { ) -> Result<Response>
if let Some((_, query)) = form_urlencoded::parse(encoded).find(|(k, _)| k == "query") { where
evaluate_sparql_query(connection, &query, request) for<'a> &'a R: Repository,
{
if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") {
evaluate_sparql_query(repository, query.to_string(), request).await
} else { } else {
Response::text("You should set the 'query' parameter").with_status_code(400) Ok(simple_response(
StatusCode::BadRequest,
"You should set the 'query' parameter",
))
} }
} }
fn evaluate_sparql_query<R: RepositoryConnection>( async fn evaluate_sparql_query<R: Send + Sync + 'static>(
connection: R, repository: Arc<R>,
query: &str, query: String,
request: &Request, request: Request,
) -> Response { ) -> Result<Response>
where
for<'a> &'a R: Repository,
{
spawn_blocking(move || {
//TODO: stream //TODO: stream
match connection.prepare_query(query, QueryOptions::default()) { let query = repository
Ok(query) => { .connection()?
let results = query.exec().unwrap(); .prepare_query(&query, QueryOptions::default())
.map_err(|e| {
let mut e = Error::from(e);
e.set_status(StatusCode::BadRequest);
e
})?;
let results = query.exec()?;
if let QueryResult::Graph(_) = results { if let QueryResult::Graph(_) = results {
let supported_formats = [ let format = content_negotiation(
request,
&[
GraphSyntax::NTriples.media_type(), GraphSyntax::NTriples.media_type(),
GraphSyntax::Turtle.media_type(), GraphSyntax::Turtle.media_type(),
GraphSyntax::RdfXml.media_type(), GraphSyntax::RdfXml.media_type(),
]; ],
let format = if let Some(accept) = request.header("Accept") { )?;
if let Some(media_type) =
priority_header_preferred(accept, supported_formats.iter().cloned())
.and_then(|p| GraphSyntax::from_mime_type(supported_formats[p]))
{
media_type
} else {
return Response::text(format!(
"No supported Accept given: {}. Supported format: {:?}",
accept, supported_formats
))
.with_status_code(415);
}
} else {
GraphSyntax::NTriples
};
Response::from_data( let mut response = Response::from(results.write_graph(Vec::default(), format)?);
format.media_type(), response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
results.write_graph(Vec::default(), format).unwrap(), Ok(response)
)
} else { } else {
let supported_formats = [ let format = content_negotiation(
request,
&[
QueryResultSyntax::Xml.media_type(), QueryResultSyntax::Xml.media_type(),
QueryResultSyntax::Json.media_type(), QueryResultSyntax::Json.media_type(),
]; ],
let format = if let Some(accept) = request.header("Accept") { )?;
if let Some(media_type) = let mut response = Response::from(results.write(Vec::default(), format)?);
priority_header_preferred(accept, supported_formats.iter().cloned()) response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
.and_then(|p| QueryResultSyntax::from_mime_type(supported_formats[p])) Ok(response)
{ }
media_type })
} else { .await
return Response::text(format!( }
"No supported Accept given: {}. Supported format: {:?}",
accept, supported_formats async fn http_server<
)) F: Clone + Send + Sync + 'static + Fn(Request) -> Fut,
.with_status_code(415); Fut: Send + Future<Output = Result<Response>>,
>(
host: String,
handle: F,
) -> Result<()> {
async fn accept<F: Fn(Request) -> Fut, Fut: Future<Output = Result<Response>>>(
addr: String,
stream: TcpStream,
handle: F,
) -> Result<()> {
async_h1::accept(&addr, stream, |request| async {
Ok(match handle(request).await {
Ok(result) => result,
Err(error) => simple_response(error.status(), error.to_string()),
})
})
.await
} }
let listener = TcpListener::bind(&host).await?;
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?.clone(); //TODO: clone stream?
let handle = handle.clone();
let addr = format!("http://{}", host);
spawn(async {
if let Err(err) = accept(addr, stream, handle).await {
eprintln!("{}", err);
};
});
}
Ok(())
}
fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> Result<F> {
let header = request
.header(&HeaderName::from_str("Accept").unwrap())
.and_then(|h| h.last())
.map(|h| h.as_str().trim())
.unwrap_or("");
let supported: Vec<Mime> = supported
.iter()
.map(|h| Mime::from_str(h).unwrap())
.collect();
let mut result = supported.first().unwrap();
let mut result_score = 0f32;
if !header.is_empty() {
for possible in header.split(',') {
let possible = Mime::from_str(possible.trim())?;
let score = if let Some(q) = possible.param("q") {
f32::from_str(q)?
} else { } else {
QueryResultSyntax::Json 1.
}; };
if score <= result_score {
continue;
}
for candidate in &supported {
if (possible.basetype() == candidate.basetype() || possible.basetype() == "*")
&& (possible.subtype() == candidate.subtype() || possible.subtype() == "*")
{
result = candidate;
result_score = score;
break;
}
}
}
}
Response::from_data( F::from_mime_type(essence(result))
format.media_type(), .ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type"))
results.write(Vec::default(), format).unwrap(),
)
} }
struct SyncAsyncBufReader<R: Unpin> {
inner: R,
} }
Err(error) => Response::text(error.to_string()).with_status_code(400),
impl<R: Unpin> From<R> for SyncAsyncBufReader<R> {
fn from(inner: R) -> Self {
Self { inner }
}
}
impl<R: Read + Unpin> std::io::Read for SyncAsyncBufReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
block_on(self.inner.read(buf))
}
//TODO: implement other methods
}
impl<R: BufRead + Unpin> std::io::BufRead for SyncAsyncBufReader<R> {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
unimplemented!()
}
fn consume(&mut self, amt: usize) {
unimplemented!()
}
fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
block_on(self.inner.read_until(byte, buf))
}
fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> {
block_on(self.inner.read_line(buf))
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::handle_request; use crate::handle_request;
use oxigraph::{MemoryRepository, Repository}; use async_std::sync::Arc;
use rouille::Request; use async_std::task::block_on;
use std::io::Read; use http_types::{Method, Request, StatusCode, Url};
use oxigraph::MemoryRepository;
#[test] #[test]
fn get_ui() { fn get_ui() {
exec(Request::fake_http("GET", "/", vec![], vec![])) exec(
Request::new(Method::Get, Url::parse("http://localhost/").unwrap()),
StatusCode::Ok,
)
}
#[test]
fn post_file() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap());
request
.insert_header("Content-Type", "text/turtle")
.unwrap();
request.set_body("<http://example.com> <http://example.com> <http://example.com> .");
exec(request, StatusCode::NoContent)
}
#[test]
fn post_wrong_file() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap());
request
.insert_header("Content-Type", "text/turtle")
.unwrap();
request.set_body("<http://example.com>");
exec(request, StatusCode::BadRequest)
}
#[test]
fn post_unsupported_file() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap());
request.insert_header("Content-Type", "text/plain").unwrap();
exec(request, StatusCode::UnsupportedMediaType)
} }
#[test] #[test]
fn get_query() { fn get_query() {
exec(Request::fake_http( exec(
"GET", Request::new(
"/query?query=SELECT+*+WHERE+{+?s+?p+?o+}", Method::Get,
vec![( Url::parse(
"Content-Type".to_string(), "http://localhost/query?query=SELECT%20*%20WHERE%20{%20?s%20?p%20?o%20}",
"application/sparql-query".to_string(), )
)], .unwrap(),
b"SELECT * WHERE { ?s ?p ?o }".to_vec(), ),
)) StatusCode::Ok,
);
}
#[test]
fn get_bad_query() {
exec(
Request::new(
Method::Get,
Url::parse("http://localhost/query?query=SELECT").unwrap(),
),
StatusCode::BadRequest,
);
}
#[test]
fn get_without_query() {
exec(
Request::new(Method::Get, Url::parse("http://localhost/query").unwrap()),
StatusCode::BadRequest,
);
} }
#[test] #[test]
fn post_query() { fn post_query() {
exec(Request::fake_http( let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap());
"POST", request
"/query", .insert_header("Content-Type", "application/sparql-query")
vec![( .unwrap();
"Content-Type".to_string(), request.set_body("SELECT * WHERE { ?s ?p ?o }");
"application/sparql-query".to_string(), exec(request, StatusCode::Ok)
)], }
b"SELECT * WHERE { ?s ?p ?o }".to_vec(),
)) #[test]
fn post_bad_query() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap());
request
.insert_header("Content-Type", "application/sparql-query")
.unwrap();
request.set_body("SELECT");
exec(request, StatusCode::BadRequest)
} }
fn exec(request: Request) { #[test]
let response = handle_request(&request, MemoryRepository::default().connection().unwrap()); fn post_unknown_query() {
let mut body = String::default(); let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap());
request request
.data() .insert_header("Content-Type", "application/sparql-todo")
.map(|mut r| r.read_to_string(&mut body).unwrap()); .unwrap();
assert_eq!(response.status_code, 200, "{}", body); request.set_body("SELECT");
exec(request, StatusCode::UnsupportedMediaType)
}
fn exec(request: Request, expected_status: StatusCode) {
let repository = Arc::new(MemoryRepository::default());
assert_eq!(
match block_on(handle_request(request, Arc::clone(&repository))) {
Ok(r) => r.status(),
Err(e) => e.status(),
},
expected_status
);
} }
} }

Loading…
Cancel
Save