Upgrades to async-h1 2

pull/35/head
Tpt 5 years ago
parent 4c7936e5be
commit 0f8b4eddd2
  1. 4
      server/Cargo.toml
  2. 69
      server/src/main.rs
  3. 6
      wikibase/Cargo.toml
  4. 46
      wikibase/src/loader.rs
  5. 37
      wikibase/src/main.rs

@ -13,7 +13,7 @@ edition = "2018"
[dependencies]
argh = "0.1"
async-std = { version = "1", features = ["attributes"] }
async-h1 = "1"
http-types = "1"
async-h1 = "2"
http-types = "2"
oxigraph = { path = "../lib", features = ["rocksdb"] }
url = "2"

@ -15,7 +15,6 @@ use async_std::io::{BufRead, Read};
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::task::{block_on, spawn, spawn_blocking};
use http_types::headers::HeaderName;
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax};
use oxigraph::{DatasetSyntax, FileSyntax, GraphSyntax, RocksDbStore};
@ -44,7 +43,7 @@ pub async fn main() -> Result<()> {
let store = RocksDbStore::open(args.file)?;
println!("Listening for requests at http://{}", &args.bind);
http_server(args.bind, move |request| {
http_server(&args.bind, move |request| {
handle_request(request, store.clone())
})
.await
@ -54,17 +53,17 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
let mut response = match (request.url().path(), request.method()) {
("/", Method::Get) => {
let mut response = Response::new(StatusCode::Ok);
response.append_header(headers::CONTENT_TYPE, "text/html")?;
response.append_header(headers::CONTENT_TYPE, "text/html");
response.set_body(HTML_ROOT_PAGE);
response
}
("/", Method::Post) => {
if let Some(content_type) = request.content_type() {
match if let Some(format) = GraphSyntax::from_mime_type(essence(&content_type)) {
match if let Some(format) = GraphSyntax::from_mime_type(content_type.essence()) {
spawn_blocking(move || {
store.load_graph(SyncAsyncBufReader::from(request), format, None, None)
})
} else if let Some(format) = DatasetSyntax::from_mime_type(essence(&content_type)) {
} else if let Some(format) = DatasetSyntax::from_mime_type(content_type.essence()) {
spawn_blocking(move || {
store.load_dataset(SyncAsyncBufReader::from(request), format, None)
})
@ -97,7 +96,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
}
("/query", Method::Post) => {
if let Some(content_type) = request.content_type() {
if essence(&content_type) == "application/sparql-query" {
if content_type.essence() == "application/sparql-query" {
let mut buffer = String::new();
let mut request = request;
request
@ -106,7 +105,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
.read_to_string(&mut buffer)
.await?;
evaluate_sparql_query(store, buffer, request).await?
} else if essence(&content_type) == "application/x-www-form-urlencoded" {
} else if content_type.essence() == "application/x-www-form-urlencoded" {
let mut buffer = Vec::new();
let mut request = request;
request
@ -127,15 +126,10 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
}
_ => Response::new(StatusCode::NotFound),
};
response.append_header("Server", SERVER)?;
response.append_header(headers::SERVER, SERVER);
Ok(response)
}
/// TODO: bad hack to overcome http_types limitations
fn essence(mime: &Mime) -> &str {
mime.essence().split(';').next().unwrap_or("")
}
fn simple_response(status: StatusCode, body: impl Into<Body>) -> Response {
let mut response = Response::new(status);
response.set_body(body);
@ -183,7 +177,7 @@ async fn evaluate_sparql_query(
)?;
let mut response = Response::from(results.write_graph(Vec::default(), format)?);
response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
response.insert_header(headers::CONTENT_TYPE, format.media_type());
Ok(response)
} else {
let format = content_negotiation(
@ -194,7 +188,7 @@ async fn evaluate_sparql_query(
],
)?;
let mut response = Response::from(results.write(Vec::default(), format)?);
response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
response.insert_header(headers::CONTENT_TYPE, format.media_type());
Ok(response)
}
})
@ -205,15 +199,14 @@ async fn http_server<
F: Clone + Send + Sync + 'static + Fn(Request) -> Fut,
Fut: Send + Future<Output = Result<Response>>,
>(
host: String,
host: &str,
handle: F,
) -> Result<()> {
async fn accept<F: Fn(Request) -> Fut, Fut: Future<Output = Result<Response>>>(
addr: String,
stream: TcpStream,
handle: F,
) -> Result<()> {
async_h1::accept(&addr, stream, |request| async {
async_h1::accept(stream, |request| async {
Ok(match handle(request).await {
Ok(result) => result,
Err(error) => simple_response(error.status(), error.to_string()),
@ -222,14 +215,13 @@ async fn http_server<
.await
}
let listener = TcpListener::bind(&host).await?;
let listener = TcpListener::bind(host).await?;
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?.clone(); //TODO: clone stream?
let stream = stream?;
let handle = handle.clone();
let addr = format!("http://{}", host);
spawn(async {
if let Err(err) = accept(addr, stream, handle).await {
if let Err(err) = accept(stream, handle).await {
eprintln!("{}", err);
};
});
@ -239,9 +231,8 @@ async fn http_server<
fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> Result<F> {
let header = request
.header(&HeaderName::from_str("Accept").unwrap())
.and_then(|h| h.last())
.map(|h| h.as_str().trim())
.header(headers::ACCEPT)
.map(|h| h.last().as_str().trim())
.unwrap_or("");
let supported: Vec<Mime> = supported
.iter()
@ -255,7 +246,7 @@ fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> R
for possible in header.split(',') {
let possible = Mime::from_str(possible.trim())?;
let score = if let Some(q) = possible.param("q") {
f32::from_str(q)?
f32::from_str(&q.to_string())?
} else {
1.
};
@ -274,7 +265,7 @@ fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> R
}
}
F::from_mime_type(essence(result))
F::from_mime_type(result.essence())
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type"))
}
@ -301,7 +292,7 @@ impl<R: BufRead + Unpin> std::io::BufRead for SyncAsyncBufReader<R> {
unimplemented!()
}
fn consume(&mut self, amt: usize) {
fn consume(&mut self, _: usize) {
unimplemented!()
}
@ -336,9 +327,7 @@ mod tests {
#[test]
fn post_file() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap());
request
.insert_header("Content-Type", "text/turtle")
.unwrap();
request.insert_header("Content-Type", "text/turtle");
request.set_body("<http://example.com> <http://example.com> <http://example.com> .");
exec(request, StatusCode::NoContent)
}
@ -346,9 +335,7 @@ mod tests {
#[test]
fn post_wrong_file() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap());
request
.insert_header("Content-Type", "text/turtle")
.unwrap();
request.insert_header("Content-Type", "text/turtle");
request.set_body("<http://example.com>");
exec(request, StatusCode::BadRequest)
}
@ -356,7 +343,7 @@ mod tests {
#[test]
fn post_unsupported_file() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap());
request.insert_header("Content-Type", "text/plain").unwrap();
request.insert_header("Content-Type", "text/plain");
exec(request, StatusCode::UnsupportedMediaType)
}
@ -396,9 +383,7 @@ mod tests {
#[test]
fn post_query() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap());
request
.insert_header("Content-Type", "application/sparql-query")
.unwrap();
request.insert_header("Content-Type", "application/sparql-query");
request.set_body("SELECT * WHERE { ?s ?p ?o }");
exec(request, StatusCode::Ok)
}
@ -406,9 +391,7 @@ mod tests {
#[test]
fn post_bad_query() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap());
request
.insert_header("Content-Type", "application/sparql-query")
.unwrap();
request.insert_header("Content-Type", "application/sparql-query");
request.set_body("SELECT");
exec(request, StatusCode::BadRequest)
}
@ -416,9 +399,7 @@ mod tests {
#[test]
fn post_unknown_query() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap());
request
.insert_header("Content-Type", "application/sparql-todo")
.unwrap();
request.insert_header("Content-Type", "application/sparql-todo");
request.set_body("SELECT");
exec(request, StatusCode::UnsupportedMediaType)
}

@ -12,11 +12,11 @@ edition = "2018"
[dependencies]
argh = "0.1"
async-native-tls = "0.3"
async-std = { version = "1", features = ["attributes"] }
async-h1 = "1"
async-h1 = "2"
chrono = "0.4"
http-client = { version = "2.0", features = ["h1_client"] }
http-types = "1"
http-types = "2"
oxigraph = { path = "../lib", features = ["rocksdb"] }
serde_json = "1"
url = "2"

@ -1,10 +1,9 @@
use crate::SERVER;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task::block_on;
use chrono::{DateTime, Datelike, Utc};
use http_client::h1::H1Client;
use http_client::HttpClient;
use http_types::{Method, Request, Result};
use http_types::{headers, Error, Method, Request, Response, Result, StatusCode};
use oxigraph::model::NamedNode;
use oxigraph::{GraphSyntax, RocksDbStore};
use serde_json::Value;
@ -18,7 +17,6 @@ pub struct WikibaseLoader {
store: RocksDbStore,
api_url: Url,
entity_data_url: Url,
client: H1Client,
namespaces: Vec<u32>,
slot: Option<String>,
frequency: Duration,
@ -38,7 +36,6 @@ impl WikibaseLoader {
store,
api_url: Url::parse(api_url)?,
entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?,
client: H1Client::new(),
namespaces: namespaces.to_vec(),
slot: slot.map(|t| t.to_owned()),
start: Utc::now(),
@ -218,16 +215,49 @@ impl WikibaseLoader {
}
let url = url.join(&("?".to_owned() + &query_serializer.finish()))?;
let mut request = Request::new(Method::Get, url);
request.append_header("user-agent", SERVER)?;
let response = self.client.send(request);
request.append_header(headers::SERVER, SERVER);
block_on(async {
let mut response = response.await?;
let mut response = self.request(request).await?;
let mut buffer = Vec::new();
response.read_to_end(&mut buffer).await?;
Ok(buffer)
})
}
async fn request(&self, request: Request) -> Result<Response> {
let addr = request
.url()
.socket_addrs(|| None)?
.into_iter()
.next()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing valid address"))?;
Ok(match request.url().scheme() {
"http" => {
let stream = TcpStream::connect(addr).await?;
async_h1::connect(stream, request).await
}
"https" => {
let stream = async_native_tls::connect(
request
.url()
.host_str()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing host"))?,
TcpStream::connect(addr).await?,
)
.await?;
async_h1::connect(stream, request).await
}
_ => {
return Err(Error::from_str(
StatusCode::BadRequest,
"missing valid address",
))
}
}?)
}
fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> {
let graph_name = NamedNode::parse(uri)?.into();
self.store.transaction(|transaction| {

@ -15,7 +15,6 @@ use async_std::future::Future;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::task::{spawn, spawn_blocking};
use http_types::headers::HeaderName;
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax};
use oxigraph::{FileSyntax, GraphSyntax, RocksDbStore};
@ -95,7 +94,7 @@ pub async fn main() -> Result<()> {
println!("Listening for requests at http://{}", &args.bind);
http_server(args.bind, move |request| {
http_server(&args.bind, move |request| {
handle_request(request, store.clone())
})
.await
@ -113,7 +112,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
}
("/query", Method::Post) => {
if let Some(content_type) = request.content_type() {
if essence(&content_type) == "application/sparql-query" {
if content_type.essence() == "application/sparql-query" {
let mut buffer = String::new();
let mut request = request;
request
@ -122,7 +121,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
.read_to_string(&mut buffer)
.await?;
evaluate_sparql_query(store, buffer, request).await?
} else if essence(&content_type) == "application/x-www-form-urlencoded" {
} else if content_type.essence() == "application/x-www-form-urlencoded" {
let mut buffer = Vec::new();
let mut request = request;
request
@ -143,15 +142,10 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
}
_ => Response::new(StatusCode::NotFound),
};
response.append_header("Server", SERVER)?;
response.append_header("Server", SERVER);
Ok(response)
}
/// TODO: bad hack to overcome http_types limitations
fn essence(mime: &Mime) -> &str {
mime.essence().split(';').next().unwrap_or("")
}
fn simple_response(status: StatusCode, body: impl Into<Body>) -> Response {
let mut response = Response::new(status);
response.set_body(body);
@ -199,7 +193,7 @@ async fn evaluate_sparql_query(
)?;
let mut response = Response::from(results.write_graph(Vec::default(), format)?);
response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
response.insert_header(headers::CONTENT_TYPE, format.media_type());
Ok(response)
} else {
let format = content_negotiation(
@ -210,7 +204,7 @@ async fn evaluate_sparql_query(
],
)?;
let mut response = Response::from(results.write(Vec::default(), format)?);
response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
response.insert_header(headers::CONTENT_TYPE, format.media_type());
Ok(response)
}
})
@ -221,15 +215,14 @@ async fn http_server<
F: Clone + Send + Sync + 'static + Fn(Request) -> Fut,
Fut: Send + Future<Output = Result<Response>>,
>(
host: String,
host: &str,
handle: F,
) -> Result<()> {
async fn accept<F: Fn(Request) -> Fut, Fut: Future<Output = Result<Response>>>(
addr: String,
stream: TcpStream,
handle: F,
) -> Result<()> {
async_h1::accept(&addr, stream, |request| async {
async_h1::accept(stream, |request| async {
Ok(match handle(request).await {
Ok(result) => result,
Err(error) => simple_response(error.status(), error.to_string()),
@ -241,11 +234,10 @@ async fn http_server<
let listener = TcpListener::bind(&host).await?;
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?.clone(); //TODO: clone stream?
let stream = stream?;
let handle = handle.clone();
let addr = format!("http://{}", host);
spawn(async {
if let Err(err) = accept(addr, stream, handle).await {
if let Err(err) = accept(stream, handle).await {
eprintln!("{}", err);
};
});
@ -255,9 +247,8 @@ async fn http_server<
fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> Result<F> {
let header = request
.header(&HeaderName::from_str("Accept").unwrap())
.and_then(|h| h.last())
.map(|h| h.as_str().trim())
.header(headers::ACCEPT)
.map(|h| h.last().as_str().trim())
.unwrap_or("");
let supported: Vec<Mime> = supported
.iter()
@ -271,7 +262,7 @@ fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> R
for possible in header.split(',') {
let possible = Mime::from_str(possible.trim())?;
let score = if let Some(q) = possible.param("q") {
f32::from_str(q)?
f32::from_str(&q.to_string())?
} else {
1.
};
@ -290,6 +281,6 @@ fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> R
}
}
F::from_mime_type(essence(result))
F::from_mime_type(result.essence())
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type"))
}

Loading…
Cancel
Save